.net System.Reactive中的并发订户执行

g6ll5ycj  于 2022-12-27  发布在  .NET
关注(0)|答案(2)|浏览(116)

我正在编写一个每Y秒处理X个未完成操作的批处理管道。感觉System.Reactive很适合这个任务,但我无法让订阅者并行执行。我的代码如下所示:

var subject = new Subject<int>();

var concurrentCount = 0;

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Subscribe(list => 
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
        Interlocked.Decrement(ref concurrentCount);
    });
    
Parallel.For(0, 1_000_000, i =>
{
    subject.OnNext(i);
 });
subject.OnCompleted();

有没有一种优雅的方法可以并发地读取这个缓冲的Subject

tf7tbtn2

tf7tbtn21#

Rx订阅代码始终是同步的¹。您需要做的是从Subscribe委托中删除处理代码,并使其成为可观察序列的副作用。以下是如何完成的:

Subject<int> subject = new();
int concurrentCount = 0;

Task processor = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Select(list => Observable.Defer(() => Observable.Start(() =>
    {
        int c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
        Interlocked.Decrement(ref concurrentCount);
    })))
    .Merge(maxConcurrent: 2)
    .DefaultIfEmpty() // Prevents exception in corner case (empty source)
    .ToTask(); // or RunAsync (either one starts the processor)

for (int i = 0; i < 1_000_000; i++)
{
    subject.OnNext(i);
}
subject.OnCompleted();

processor.Wait();

Select + Observable.Defer + Observable.Start组合将源序列转换为IObservable<IObservable<Unit>>。它是一个嵌套序列,每个内部序列代表一个list的处理。当Observable.Start的委托完成时,内部序列发出Unit值然后完成。 Package Defer操作符确保内部序列是“冷的”,以便在订阅它们之前不会启动它们。然后执行Merge运算符,其将外部序列展开为平坦的IObservable<Unit>序列。maxConcurrent参数配置将同时预订多少内部序列。每次Merge操作符订阅内部序列时,对应的Observable.Start委托就开始在ThreadPool线程上运行。
如果将maxConcurrent设置得太高,ThreadPool可能会耗尽工作进程(换句话说,它可能会饱和),代码的并发性将依赖于ThreadPool的可用性。如果您愿意,可以根据需要增加ThreadPool立即创建的工作线程的数量,但是,如果您的工作负载是CPU受限的,并且您将工作线程增加到Environment.ProcessorCount值以上,那么很可能您的CPU将饱和。
如果您的工作负载是异步的,可以用Observable.FromAsync操作符替换Observable.Defer + Observable.Start组合,如下所示。
¹ unpublishedAsyncRx.NET采用了异步订阅的思想,它基于新的接口IAsyncObservable<T>IAsyncObserver<T>

nsc4cvqm

nsc4cvqm2#

你这样说:

// This never gets printed, because Subscribe is only ever called on a single thread.

这是不正确的。没有打印的原因是因为Subscribe中的代码以锁定的方式发生-在Subscribe中一次只有一个线程执行,所以你几乎是立即递增值然后递减它。而且因为它从零开始,所以它永远没有机会超过1
现在这只是因为Rx契约,一次只能订阅一个线程。
我们可以弥补的。
请尝试以下代码:

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .SelectMany(list =>
        Observable
            .Start(() =>
            {
                var c = Interlocked.Increment(ref concurrentCount);
                Console.WriteLine("Starting {0} simultaneous batches", c);
            })
            .Finally(() =>
            {
                var c = Interlocked.Decrement(ref concurrentCount);
                Console.WriteLine("Ending {0} simultaneous batches", c);
            }))
    .Subscribe();

现在,当我运行它时(迭代次数少于您设置的1_000_000),我得到如下输出:

Starting 1 simultaneous batches
Starting 4 simultaneous batches
Ending 3 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Starting 3 simultaneous batches
Ending 1 simultaneous batches
Ending 2 simultaneous batches
Starting 4 simultaneous batches
Starting 5 simultaneous batches
Ending 3 simultaneous batches
Starting 2 simultaneous batches
Starting 2 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Ending 0 simultaneous batches
Ending 4 simultaneous batches
Ending 1 simultaneous batches
Starting 1 simultaneous batches
Starting 1 simultaneous batches
Ending 0 simultaneous batches
Ending 0 simultaneous batches

相关问题