我试着为Rx.Net创建一个SerialGroupBy操作符。该操作符的要点是像GroupBy一样工作,但每次创建一个新组时,前者都会完成。因此,一次打开的组永远不会超过一个。
我当前的“最佳”实现如下所示:
public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
stream.Publish(shared =>
shared.GroupByUntil(keySelector, elementSelector, group =>
shared.DistinctUntilChanged(keySelector)));
我希望在下一组开始之前关闭该组,就像这里测试的那样:
[Fact]
public void SerialGroupBy()
{
var scheduler = new TestScheduler();
var stream = scheduler.CreateHotObservable(
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
var observer = scheduler.CreateObserver<string>();
stream.SerialGroupBy(x => x.Length, x => x)
.Select(x => x.Subscribe(observer))
.Subscribe();
scheduler.Start();
observer.Messages.ShouldBeLike(
OnNext(201, "First group"),
OnCompleted<string>(202),
OnNext(202, "Second group"),
OnNext(203, "Second group"));
}
但是第一组的完成太晚了,比如:
OnNext(201, "First group"),
OnNext(202, "Second group"),
OnCompleted<string>(202),
OnNext(203, "Second group"));
我可以理解为什么(根据GroupByUntil的实现,在关闭观察者之前通知打开观察者),但是我如何实现它,使组不重叠?
我试过几种不同的方法,但总是以同一问题的变体结束。
1条答案
按热度按时间deyfvvtc1#
这是我最后得出的结论: