我正在编写一个每Y秒处理X个未完成操作的批处理管道。感觉System.Reactive
很适合这个任务,但我无法让订阅者并行执行。我的代码如下所示:
var subject = new Subject<int>();
var concurrentCount = 0;
using var reader = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Subscribe(list =>
{
var c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
Interlocked.Decrement(ref concurrentCount);
});
Parallel.For(0, 1_000_000, i =>
{
subject.OnNext(i);
});
subject.OnCompleted();
有没有一种优雅的方法可以并发地读取这个缓冲的Subject
?
2条答案
按热度按时间tf7tbtn21#
Rx订阅代码始终是同步的¹。您需要做的是从
Subscribe
委托中删除处理代码,并使其成为可观察序列的副作用。以下是如何完成的:Select
+Observable.Defer
+Observable.Start
组合将源序列转换为IObservable<IObservable<Unit>>
。它是一个嵌套序列,每个内部序列代表一个list
的处理。当Observable.Start
的委托完成时,内部序列发出Unit
值然后完成。 PackageDefer
操作符确保内部序列是“冷的”,以便在订阅它们之前不会启动它们。然后执行Merge
运算符,其将外部序列展开为平坦的IObservable<Unit>
序列。maxConcurrent
参数配置将同时预订多少内部序列。每次Merge
操作符订阅内部序列时,对应的Observable.Start
委托就开始在ThreadPool
线程上运行。如果将
maxConcurrent
设置得太高,ThreadPool
可能会耗尽工作进程(换句话说,它可能会饱和),代码的并发性将依赖于ThreadPool
的可用性。如果您愿意,可以根据需要增加ThreadPool
立即创建的工作线程的数量,但是,如果您的工作负载是CPU受限的,并且您将工作线程增加到Environment.ProcessorCount
值以上,那么很可能您的CPU将饱和。如果您的工作负载是异步的,可以用
Observable.FromAsync
操作符替换Observable.Defer
+Observable.Start
组合,如下所示。¹ unpublished库AsyncRx.NET采用了异步订阅的思想,它基于新的接口
IAsyncObservable<T>
和IAsyncObserver<T>
。nsc4cvqm2#
你这样说:
这是不正确的。没有打印的原因是因为
Subscribe
中的代码以锁定的方式发生-在Subscribe
中一次只有一个线程执行,所以你几乎是立即递增值然后递减它。而且因为它从零开始,所以它永远没有机会超过1
。现在这只是因为Rx契约,一次只能订阅一个线程。
我们可以弥补的。
请尝试以下代码:
现在,当我运行它时(迭代次数少于您设置的
1_000_000
),我得到如下输出: