/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> function)
{
return source
.Scan((Task: Task.FromResult<TResult>(default), Id: 0), (previous, item) =>
!previous.Task.IsCompleted ? previous : (function(item), unchecked(previous.Id + 1)))
.DistinctUntilChanged()
.Select(e => e.Task)
.Concat();
}
Produced #1
--Result: 1
Produced #2
--Result: 2
Produced #3
Produced #4
Produced #5
--Result: 3
Produced #6
Produced #7
Produced #8
--Result: 6
Produced #9
Produced #10
/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> function)
{
return Observable.Defer(() =>
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(item =>
{
// Attempt to acquire the mutex immediately. If successful, return
// a sequence that releases the mutex when terminated. Otherwise,
// return immediately an empty sequence.
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return function(item).Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<TResult>();
});
});
}
1条答案
按热度按时间rjee0c151#
下面是
ExhaustMap
操作符的实现。源可观测量被投影到IObservable<(Task<TResult>, int)>
,其中每个后续任务要么是前一个任务(如果它仍在运行),要么是与当前项关联的新任务。然后,使用DistinctUntilChanged
操作符删除重复出现的同一任务,最后,使用Concat
操作符将可观测量展平。由
function
返回的任务不保证是不同的。例如,方法async Task<T> Return<T>(T result) => result;
对于result = 1
或result = false
总是返回相同的Task
。因此,在上述实现中需要递增的Id
,其使任务个体化。以便DistinctUntilChanged
不会从单独的function
调用中过滤掉任务。用法示例:
输出:
Online demo .
下面是
ExhaustMap
的替代实现,其中function
生成IObservable<TResult>
而不是Task<TResult>
: