我正在尝试使用Reactive Extensions for .NET重写一些代码,但是我需要一些关于如何实现我的目标的指导。
我有一个类,它在一个低级库中封装了一些异步行为。考虑一下读取或写入网络的东西。当类启动时,它将尝试连接到环境,当成功时,它将通过从工作线程调用来返回信号。
我想把这个异步行为转变成一个同步调用,我已经创建了一个非常简化的例子,下面是如何实现的:
ManualResetEvent readyEvent = new ManualResetEvent(false);
public void Start(TimeSpan timeout) {
// Simulate a background process
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Wait for startup to complete.
if (!this.readyEvent.WaitOne(timeout))
throw new TimeoutException();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay); // Simulate startup delay.
this.readyEvent.Set();
}
在工作线程上运行AsyncStart
只是模拟库的异步行为的一种方式,而不是我的真实的代码的一部分,在实际代码中,低级库提供线程并在回调上调用我的代码。
请注意,如果start没有在超时间隔内完成,Start
方法将抛出TimeoutException
。
我想重写这段代码来使用Rx。下面是我的第一次尝试:
Subject<Unit> readySubject = new Subject<Unit>();
public void Start(TimeSpan timeout) {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point A - see below
this.readySubject.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
这是一个不错的尝试,但不幸的是它包含一个竞态条件。如果启动完成 * 快 *(例如,如果delay
为0),并且如果在点A处存在附加延迟,则在First
执行之前,将在readySubject
上调用OnNext
。m应用Timeout
和First
时,不会看到启动已经完成,而是会抛出一个TimeoutException
。Observable.Defer
似乎是为了处理这样的问题而创建的。下面是使用Rx的稍微复杂一点的尝试:
Subject<Unit> readySubject = new Subject<Unit>();
void Start(TimeSpan timeout) {
var ready = Observable.Defer(() => {
ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
// Point B - see below
return this.readySubject.AsObservable();
});
ready.Timeout(timeout).First();
}
void AsyncStart(TimeSpan delay) {
Thread.Sleep(delay);
this.readySubject.OnNext(new Unit());
}
现在,异步操作不会立即启动,而是仅在使用IObservable
时启动。遗憾的是,仍然存在争用条件,但这次是在B点。如果异步操作在Defer
lambda返回之前启动调用OnNext
,则它仍然丢失,并且Timeout
将抛出TimeoutException
。
我知道我可以使用像Replay
这样的操作符来缓冲事件,但是我的初始示例没有使用任何类型的缓冲。有没有办法让我使用Rx来解决我的问题而不出现争用条件?本质上,只有在IObservable
连接到Timeout
和First
之后才开始异步操作?
根据Ana Betts的回答,以下是可行的解决方案:
void Start(TimeSpan timeout) {
var readySubject = new AsyncSubject<Unit>();
ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
// Point C - see below
readySubject.Timeout(timeout).First();
}
void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
Thread.Sleep(delay);
readySubject.OnNext(new Unit());
readySubject.OnCompleted();
}
有趣的是,当C点的延迟比AsyncStart
完成所需的时间长时,AsyncSubject
会保留最后发送的通知,Timeout
和First
仍将按预期执行。
2条答案
按热度按时间7d7tgy0s1#
所以,有一件事要知道Rx我想很多人一开始都会做(包括我自己!):如果您正在使用任何传统的线程函数,如ResetEvents、Thread.Sleeps或其他函数,那么您就做错了(tm)--这就像在LINQ中将对象强制转换为数组,因为您知道底层类型恰好是数组。
要知道的关键是异步函数是由一个返回
IObservable<TResult>
的函数来表示的--这是一个神奇的调味汁,让你在事情完成时发出信号。下面是你如何"Rx-ify"一个更传统的异步函数,就像你在Silverlight web服务中看到的那样:这是一个不错的尝试,但不幸的是它包含一个争用条件。
这就是
AsyncSubject
的用武之地--这可以确保即使asyncReaderFunc比Subscribe更有效,AsyncSubject仍然会"重放"发生的事情。现在我们有了函数,我们可以对它做很多有趣的事情:
djmepvbi2#
我想进一步补充Paul的评论,即添加WaitHandles意味着你做错了,直接使用Subjects通常意味着你也做错了。- )
试着考虑你的Rx代码与序列或管道一起工作。主题提供读写功能,这意味着你不再与管道或序列一起工作了(除非你有双向的管道或可以反转的序列?!?)
首先,Paul的代码很酷,但是让我们“Rx the hell out of it”(把它从地狱里拿出来)。
1stAsyncStart方法将其更改为
太简单了!看,没有主题,数据只单向流动。这里重要的是签名的变化。它会把东西推给我们。现在这是非常明确的。传递一个主题给我是非常模糊的。
2nd。我们现在不需要在start方法中定义Subject。我们还可以利用Scheduler功能,而不是使用旧的ThreadPool. QueueUserWorkItem。
现在我们有了一个清晰的管道或事件序列
异步启动--〉就绪--〉启动
而不是启动--〉异步启动--〉启动
如果我对你的问题空间有更多的了解,我相信我们可以想出一个更好的方法来完成这个任务,而不需要start方法的阻塞特性。你越多地使用Rx,你就会发现你以前关于什么时候需要阻塞、使用waithandles等的假设可以被抛到九霄云外。