.net React式扩展不重叠,串行GroupBy(或WindowUntilChange)

fwzugrvs  于 2023-03-09  发布在  .NET
关注(0)|答案(1)|浏览(86)

我试着为Rx.Net创建一个SerialGroupBy操作符。该操作符的要点是像GroupBy一样工作,但每次创建一个新组时,前者都会完成。因此,一次打开的组永远不会超过一个。
我当前的“最佳”实现如下所示:

public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
    this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
    stream.Publish(shared => 
        shared.GroupByUntil(keySelector, elementSelector, group => 
            shared.DistinctUntilChanged(keySelector)));

我希望在下一组开始之前关闭该组,就像这里测试的那样:

[Fact]
public void SerialGroupBy()
{
    var scheduler = new TestScheduler();

    var stream = scheduler.CreateHotObservable(
        OnNext(201, "First group"),
        OnNext(202, "Second group"),
        OnNext(203, "Second group"));

    var observer = scheduler.CreateObserver<string>();

    stream.SerialGroupBy(x => x.Length, x => x)
        .Select(x => x.Subscribe(observer))
        .Subscribe();

    scheduler.Start();

    observer.Messages.ShouldBeLike(
        OnNext(201, "First group"),
        OnCompleted<string>(202),
        OnNext(202, "Second group"),
        OnNext(203, "Second group"));
}

但是第一组的完成太晚了,比如:

OnNext(201, "First group"),
OnNext(202, "Second group"),
OnCompleted<string>(202),
OnNext(203, "Second group"));

我可以理解为什么(根据GroupByUntil的实现,在关闭观察者之前通知打开观察者),但是我如何实现它,使组不重叠?
我试过几种不同的方法,但总是以同一问题的变体结束。

deyfvvtc

deyfvvtc1#

这是我最后得出的结论:

public static class SerialGroupByOperator
{
    public static IObservable<IGroupedObservable<TKey, TElement>> SerialGroupBy<TKey, TSource, TElement>(
        this IObservable<TSource> stream, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) =>
        Observable.Create<IGroupedObservable<TKey, TElement>>(observer => stream
            .Scan<TSource, GroupedObservable<TKey, TElement>>(null, (current, next) =>
            {
                var key = keySelector(next);

                if (current == null)
                {
                    var nextGroup = new GroupedObservable<TKey, TElement>(key);

                    observer.OnNext(nextGroup);
                    nextGroup.Group.OnNext(elementSelector(next));

                    return nextGroup;
                }

                if (!Equals(key, current.Key))
                {
                    current.Group.OnCompleted();

                    var nextGroup = new GroupedObservable<TKey, TElement>(key);

                    observer.OnNext(nextGroup);
                    nextGroup.Group.OnNext(elementSelector(next));

                    return nextGroup;
                }

                current.Group.OnNext(elementSelector(next));

                return current;
            })
            .LastOrDefaultAsync()
            .Where(x => x != null)
            .Subscribe(x => x.Group.OnCompleted(), observer.OnError, observer.OnCompleted));
}

internal class GroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement>
{
    public GroupedObservable(TKey key)
    {
        Key = key;
        Group = new Subject<TElement>();
    }

    public TKey Key { get; }
    public ISubject<TElement> Group { get; }

    public IDisposable Subscribe(IObserver<TElement> observer) => Group.Subscribe(observer);
}

相关问题