.net 使用Rx阻塞(可能超时)异步操作

ki0zmccv  于 2022-11-26  发布在  .NET
关注(0)|答案(2)|浏览(161)

我正在尝试使用Reactive Extensions for .NET重写一些代码,但是我需要一些关于如何实现我的目标的指导。
我有一个类,它在一个低级库中封装了一些异步行为。考虑一下读取或写入网络的东西。当类启动时,它将尝试连接到环境,当成功时,它将通过从工作线程调用来返回信号。
我想把这个异步行为转变成一个同步调用,我已经创建了一个非常简化的例子,下面是如何实现的:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

在工作线程上运行AsyncStart只是模拟库的异步行为的一种方式,而不是我的真实的代码的一部分,在实际代码中,低级库提供线程并在回调上调用我的代码。
请注意,如果start没有在超时间隔内完成,Start方法将抛出TimeoutException
我想重写这段代码来使用Rx。下面是我的第一次尝试:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

这是一个不错的尝试,但不幸的是它包含一个竞态条件。如果启动完成 * 快 *(例如,如果delay为0),并且如果在点A处存在附加延迟,则在First执行之前,将在readySubject上调用OnNext。m应用TimeoutFirst时,不会看到启动已经完成,而是会抛出一个TimeoutException
Observable.Defer似乎是为了处理这样的问题而创建的。下面是使用Rx的稍微复杂一点的尝试:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

现在,异步操作不会立即启动,而是仅在使用IObservable时启动。遗憾的是,仍然存在争用条件,但这次是在B点。如果异步操作在Defer lambda返回之前启动调用OnNext,则它仍然丢失,并且Timeout将抛出TimeoutException
我知道我可以使用像Replay这样的操作符来缓冲事件,但是我的初始示例没有使用任何类型的缓冲。有没有办法让我使用Rx来解决我的问题而不出现争用条件?本质上,只有在IObservable连接到TimeoutFirst之后才开始异步操作?
根据Ana Betts的回答,以下是可行的解决方案:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

有趣的是,当C点的延迟比AsyncStart完成所需的时间长时,AsyncSubject会保留最后发送的通知,TimeoutFirst仍将按预期执行。

7d7tgy0s

7d7tgy0s1#

所以,有一件事要知道Rx我想很多人一开始都会做(包括我自己!):如果您正在使用任何传统的线程函数,如ResetEvents、Thread.Sleeps或其他函数,那么您就做错了(tm)--这就像在LINQ中将对象强制转换为数组,因为您知道底层类型恰好是数组。
要知道的关键是异步函数是由一个返回IObservable<TResult>的函数来表示的--这是一个神奇的调味汁,让你在事情完成时发出信号。下面是你如何"Rx-ify"一个更传统的异步函数,就像你在Silverlight web服务中看到的那样:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile, buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}

这是一个不错的尝试,但不幸的是它包含一个争用条件。
这就是AsyncSubject的用武之地--这可以确保即使asyncReaderFunc比Subscribe更有效,AsyncSubject仍然会"重放"发生的事情。
现在我们有了函数,我们可以对它做很多有趣的事情:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .First();
djmepvbi

djmepvbi2#

我想进一步补充Paul的评论,即添加WaitHandles意味着你做错了,直接使用Subjects通常意味着你也做错了。- )
试着考虑你的Rx代码与序列或管道一起工作。主题提供读写功能,这意味着你不再与管道或序列一起工作了(除非你有双向的管道或可以反转的序列?!?)
首先,Paul的代码很酷,但是让我们“Rx the hell out of it”(把它从地狱里拿出来)。

1stAsyncStart方法将其更改为

IObservable<Unit> AsyncStart(TimeSpan delay) 
{
  Observable.Timer(delay).Select(_=>Unit.Default);
}

太简单了!看,没有主题,数据只单向流动。这里重要的是签名的变化。它会把东西推给我们。现在这是非常明确的。传递一个主题给我是非常模糊的。

2nd。我们现在不需要在start方法中定义Subject。我们还可以利用Scheduler功能,而不是使用旧的ThreadPool. QueueUserWorkItem。

void Start(TimeSpan timeout) 
{
    var isReady = AsyncStart(TimeSpan.FromSeconds(1))
                    .SubscribeOn(Scheduler.ThreadPool)
                    .PublishLast();
    isReady.Connect();
    isReady.Timeout(timeout).First();
}

现在我们有了一个清晰的管道或事件序列
异步启动--〉就绪--〉启动
而不是启动--〉异步启动--〉启动
如果我对你的问题空间有更多的了解,我相信我们可以想出一个更好的方法来完成这个任务,而不需要start方法的阻塞特性。你越多地使用Rx,你就会发现你以前关于什么时候需要阻塞、使用waithandles等的假设可以被抛到九霄云外。

相关问题