.net 如何使用SqlDataReader返回和使用IAsyncEnumerable

brc7rcf0  于 2023-03-31  发布在  .NET
关注(0)|答案(6)|浏览(175)

请看下面的两个方法。第一个返回一个IAsyncEnumerable。第二个尝试使用它。

using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class SqlUtility
{
    public static async IAsyncEnumerable<IDataRecord> GetRecordsAsync(
        string connectionString, SqlParameter[] parameters, string commandText,
        [EnumeratorCancellation]CancellationToken cancellationToken)
    {
        using (SqlConnection connection = new SqlConnection(connectionString))
        {
            await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
            using (SqlCommand command = new SqlCommand(commandText, connection))
            {
                command.Parameters.AddRange(parameters);
                using (var reader = await command.ExecuteReaderAsync()
                    .ConfigureAwait(false))
                {
                    while (await reader.ReadAsync().ConfigureAwait(false))
                    {
                        yield return reader;
                    }
                }
            }
        }
    }

    public static async Task Example()
    {
        const string connectionString =
            "Server=localhost;Database=[Redacted];Integrated Security=true";
        SqlParameter[] parameters = new SqlParameter[]
        {
            new SqlParameter("VideoID", SqlDbType.Int) { Value = 1000 }
        };
        const string commandText = "select * from Video where VideoID=@VideoID";
        IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString,
            parameters, commandText, CancellationToken.None);
        IDataRecord firstRecord = await records.FirstAsync().ConfigureAwait(false);
        object videoID = firstRecord["VideoID"]; //Should be 1000.
        // Instead, I get this exception:
        // "Invalid attempt to call MetaData when reader is closed."
    }
}

当代码试图读取结果IDataReader(在object videoID = firstRecord["VideoID"];)时,我得到了这个异常:
读取器关闭时调用元数据的尝试无效。
这是因为SqlDataReader被释放了。有人能提供一个推荐的方法来以异步的方式枚举SqlDataReader,以便每个结果记录都可用于调用方法吗?谢谢。

50few1ms

50few1ms1#

在这种情况下,LINQ不是您的朋友,因为FirstAsync将在 * 返回结果之前关闭迭代器,这不是ADO .NET所期望的;基本上:不要在这里使用LINQ,或者至少:你可以使用Select之类的工具来执行投影 *,而序列仍然是打开的 *,或者将所有的工作卸载给Dapper之类的工具会更容易,或者手动执行:

await foreach (var record in records)
{
    // TODO: process record
    // (perhaps "break"), because you only want the first
}
qf9go6mv

qf9go6mv2#

你可以通过不返回依赖于连接仍然打开的对象来避免这种情况。例如,如果你只需要VideoID,那么只返回它(我假设它是int):

public static async IAsyncEnumerable<int> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    ...
                    yield return reader["VideoID"];
    ...
}

或者投影到你自己的类中:

public class MyRecord {
    public int VideoId { get; set; }
}

public static async IAsyncEnumerable<MyRecord> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
    ...
                    yield return new MyRecord {
                        VideoId = reader["VideoID"]
                    }
    ...
}

或者按照Marc的建议,在第一个之后使用foreachbreak,在您的情况下看起来像这样:

IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString, parameters, commandText, CancellationToken.None);
object videoID;
await foreach (var record in records)
{
    videoID = record["VideoID"];
    break;
}
zy1mlcev

zy1mlcev3#

当你公开一个打开的DataReader时,关闭它沿着底层Connection的责任现在属于调用者,所以你不应该释放任何东西。相反,你应该使用DbCommand.ExecuteReaderAsync重载,它接受CommandBehavior参数,并传递CommandBehavior.CloseConnection值:
执行该命令时,关联的Connection对象将在关联的DataReader对象关闭时关闭。
然后你就可以希望调用者会遵守规则,及时调用DataReader.Close方法,并且在对象被垃圾收集之前不会让连接打开。因此,公开一个打开的DataReader应该被认为是一种极端的性能优化技术,应该谨慎使用。
顺便说一句,如果你返回一个IEnumerable<IDataRecord>而不是一个IAsyncEnumerable<IDataRecord>,你也会遇到同样的问题。

vzgqcmou

vzgqcmou4#

要添加到其他答案中,您可以使您的实用程序方法成为泛型,并添加投影委托Func<IDataRecord, T> projection作为参数,如下所示:

public static async IAsyncEnumerable<T> GetRecordsAsync<T>(
    string connectionString, SqlParameter[] parameters, string commandText,
    Func<IDataRecord, T> projection, // Parameter here
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    ...
                    yield return projection(reader); // Projected here
    ...
}

然后在调用时传入一个lambda或引用一个方法组,例如:

public static object GetVideoId(IDataRecord dataRecord)
    => dataRecord["VideoID"];

例如:

GetRecordsAsync(connectionString, parameters, commandText, GetVideoId, CancellationToken.None);
5gfr0r5j

5gfr0r5j5#

在2021年年底的时候,我有一个确切的问题。我找不到一个完整的例子,所以我只是在我能找到的东西周围乱转,直到我找到一些工作。
下面是我的代码完整,虽然很简单(所以你可以稍后扩展它)的例子,沿着一些注解,详细说明了我在这个过程中遇到的一些问题:

// This function turns each "DataRow" into an object of Type T and yields
// it. You could alternately yield the reader itself for each row.
// In this example, assume sqlCommandText and connectionString exist.
public async IAsyncEnumerable<T> ReadAsync<T>( Func<SqlDataReader, T> func )
{
    // we need a connection that will last as long as the reader is open,
    // alternately you could pass in an open connection.
    using SqlConnection connection = new SqlConnection( connectionString );
    using SqlCommand cmd = new SqlCommand( sqlCommandText, connection );

    await connection.OpenAsync();
    var reader = await cmd.ExecuteReaderAsync();
    while( await reader.ReadAsync() )
    {
        yield return func( reader );
    }
}

然后,在(异步)代码的任何其他部分,都可以在await foreach循环中调用函数:

private static async Task CallIAsyncEnumerable()
{
    await foreach( var category in ReadAsync( ReaderToCategory ) )
    {
        // do something with your category; save it in a list, write it to disk,
        // make an HTTP call ... the whole world is yours!
    }
}

// an example delegate, which I'm passing into ReadAsync
private static Category ReaderToCategory( SqlDataReader reader )
{
    return new Category()
    {
        Code = ( string )reader[ "Code" ],
        Group = ( string )reader[ "Group" ]
    };
}

我还发现了几件事:你不能在try中使用yield,但是你可以把cmd.ExecuteReaderAsync()之前的所有东西都塞进try,或者一个单独的方法来返回DataReader。或者你可以把await foreach Package 在try块中;我认为问题在于屈服于尝试之外的调用者(这是有道理的,在你考虑之后)。
如果你使用另一个方法来生成读取器,* 将连接传递给那个方法 *,这样你就可以控制它的生存期。如果你的方法创建了连接,执行了命令,并返回了SqlDataReader,那么在你从读取器读取之前,连接就会关闭(如果你使用了'using')。同样,如果你仔细想想,这是完全有道理的,但是它让我困惑了几分钟。
祝你好运,我希望这对将来的别人有帮助!

ndasle7k

ndasle7k6#

我推荐这样的东西:

public async IAsyncEnumerable<Customer> GetByCity(string city)
{
   const string sql = "SELECT * FROM Customers WHERE City = @City";
    
   using var command = connection.CreateCommand();
   command.CommandText = sql;
   command.Parameters.AddWithValue("@City", city);
    
   if (connection.State == ConnectionState.Closed)
     await connection.OpenAsync();
    
   using SqlDataReader reader = await command.ExecuteReaderAsync();
    
    while (await reader.ReadAsync())
    {
        yield return Map(reader);
    }
}
    
private static Customer Map(SqlDataReader reader) => new Customer
{
   FirstName = (string) reader["FirstName"],
   LastName  = (string) reader["LastName"]
}
await foreach (var customer in customerRepository.GetByCity("Warsaw"))
{
   // ...
}

相关问题