如何在Rx.Net中实现exhaustMap处理程序?

hwamh0ep  于 2023-03-13  发布在  .NET
关注(0)|答案(1)|浏览(122)

我正在从rxjs中寻找类似于exhaustMap运算符的东西,但RX.NET似乎没有这样的运算符。
我需要实现的是,在源码流的每个元素上,我需要启动一个async处理程序,直到它完成,我希望从源码中删除任何元素。
我不希望在每个元素上启动一个异步处理程序--在处理程序运行时,我希望删除源元素。
我还怀疑我需要在这里巧妙地使用defer操作符。
谢谢大家!

rjee0c15

rjee0c151#

下面是ExhaustMap操作符的实现。源可观测量被投影到IObservable<(Task<TResult>, int)>,其中每个后续任务要么是前一个任务(如果它仍在运行),要么是与当前项关联的新任务。然后,使用DistinctUntilChanged操作符删除重复出现的同一任务,最后,使用Concat操作符将可观测量展平。

/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, Task<TResult>> function)
{
    return source
        .Scan((Task: Task.FromResult<TResult>(default), Id: 0), (previous, item) =>
            !previous.Task.IsCompleted ? previous : (function(item), unchecked(previous.Id + 1)))
        .DistinctUntilChanged()
        .Select(e => e.Task)
        .Concat();
}

function返回的任务不保证是不同的。例如,方法async Task<T> Return<T>(T result) => result;对于result = 1result = false总是返回相同的Task。因此,在上述实现中需要递增的Id,其使任务个体化。以便DistinctUntilChanged不会从单独的function调用中过滤掉任务。
用法示例:

Observable
    .Interval(TimeSpan.FromMilliseconds(200))
    .Select(x => (int)x + 1)
    .Take(10)
    .Do(x => Console.WriteLine($"Produced #{x}"))
    .ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
    .Do(x => Console.WriteLine($"--Result: {x}"))
    .Wait();

输出:

Produced #1
--Result: 1
Produced #2
--Result: 2
Produced #3
Produced #4
Produced #5
--Result: 3
Produced #6
Produced #7
Produced #8
--Result: 6
Produced #9
Produced #10

Online demo .
下面是ExhaustMap的替代实现,其中function生成IObservable<TResult>而不是Task<TResult>

/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> function)
{
    return Observable.Defer(() =>
    {
        int mutex = 0; // 0: not acquired, 1: acquired
        return source.SelectMany(item =>
        {
            // Attempt to acquire the mutex immediately. If successful, return
            // a sequence that releases the mutex when terminated. Otherwise,
            // return immediately an empty sequence.
            if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
                return function(item).Finally(() => Volatile.Write(ref mutex, 0));
            return Observable.Empty<TResult>();
        });
    });
}

相关问题