.net 将Task[]任务转换为IEnumerable〈Task>时遇到的问题< T>

p8ekf7hl  于 2023-04-08  发布在  .NET
关注(0)|答案(1)|浏览(179)

错误任务[]任务转换IEnumerable〈任务〉

//await Task.WhenAll(WithMaxConcurrency(tasks,3)); // getting error at tasks conversion
var tasks = new Func<Task>[]
        {
            () => jack(),
            () => roy(),
            () => sam()
        };

        await Task.WhenAll(WithMaxConcurrency(tasks,3)); // getting error at tasks conversion
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(IEnumerable<Task<T>> tasks, int maxParallelism)
    {
        SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
        // The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
        return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
        {
            try { return task; }
            finally { maxOperations.Release(); }
        }).Unwrap());
    }

要将Task[]任务放入IEnumerable〈Task〉

gk7wooem

gk7wooem1#

问题似乎是如何限制任务的执行。目前还不清楚这些任务是做什么的。.NET具有高级功能,允许在管道等中并发处理大量数据。一种选择是使用Parallel.ForEachAsync使用特定数量的worker来处理数据流

使用Parallel.ForEachAsync

This example展示了如何检索Github用户bios,一次3个:

var userHandlers = new []
{
    "users/okyrylchuk",
    "users/shanselman",
    "users/jaredpar",
    "users/davidfowl"
};
 

ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(userHandlers, parallelOptions, async (uri, token) =>
{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri, token);

    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
});

这也可以用来处理一个Func<Task>调用的集合,充当一个作业队列。但这不如处理一个输入流有用:

var funcs= new Func<Task>[]
        {
            () => jack(),
            () => roy(),
            () => sam()
        };
 

ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(funcs, parallelOptions, async (func, _) =>
{
    await func();
});

Parallel.ForEachAsync不返回任何结果。委托必须将它们存储在例如ConcurrentQueue中:

var results=ConcurrentQueue<Whatever>();

await Parallel.ForEachAsync(funcs, parallelOptions, async (func, _) =>
{
    var result=await func();
    results.Enqueue(result);
});

使用数据流块

另一种选择是使用Dataflow块,例如ActionBlock来处理具有固定DOP的数据/调用:

var dop=new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 3
         };

var downloader=new ActionBlock<string>(async uri=>{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri);
    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
},dop);

foreach(var uri in userHandlers)
{
    await downloader.SendAsync(uri);
}
downloader.Complete();

await downloader.Completion;

块可以返回结果,并组合到处理步骤的管道中:

var downloader=new TransformBlock<Uri,FileInfo>(DownloadToCsv,dop);
var importer=new ActionBlock<FileInfo>(Importer);

downloader.LinkTo(importer,new DataflowLinkOptions {PropagateCompletion=true});

foreach(var uri in fileUris)
{
    await downloader.SendAsync(uri);
}
downloader.Complete();

await importer.Completion;

在本例中,downloader块一次检索3个文件,并将FileInfo对象发送到Importer块。该块一次将一个文件导入数据库。所有块并发工作,同时下载和导入数据。
当所有文件都是请求时,downloader.Complete()告诉头块我们完成了。
数据流块也可以用作异步工作队列,但这并不像正确使用它们那样灵活:

var queue=new TransformBlock<Func<Task<Whatever>>,Whatever>(func=>func(),dop);
var buffer=new BufferBlock<Whatever>();

queue.LinkTo(buffer);

foreach(var func in funcs)
{
    await queue.SendAsync(func);
}
queue.Complete();
await queue.Completion;

if (buffer.TryReceiveAll(out var results))
{
   //Use the results
}

相关问题