如何用c#rx实现redis流

4szc88ey  于 2021-06-09  发布在  Redis
关注(0)|答案(3)|浏览(503)

因为我找不到任何不使用循环来获取流内容的实现,所以我开始实现一个,但我面临着几个问题,也许你们中的一些人可以给我指出正确的位置。
实现使用pub/sub和流的组合:log->stream channellog:notification ->发布/订阅*log:lastreadmessage ->包含流中的最后一个读取密钥
出版商

static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);

            while(true)
            {
                var value =  new NameValueEntry[]
                {
                    new NameValueEntry("id", Guid.NewGuid().ToString()),
                    new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
                };

                redisDb.StreamAdd("log", value);
                var publisher = connectionMultiplexer.GetSubscriber();
                publisher.Publish("log:notify", string.Empty, CommandFlags.None);
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }

用户

static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);

            var observableStream =  CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
                .Subscribe(x => {
                  Console.WriteLine(x);  
                });

            Console.ReadLine();
        }
private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);

        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            return Observable.Create<string>(obs => 
            {
                var subscriber = connection.GetSubscriber();
                subscriber.Subscribe($"{channel}:notify", async (ch, msg) => 
                {
                    var locker = await taskFromStreamBlocker
                        .WaitAsync(0)
                        .ConfigureAwait(false);

                    if (!locker)
                    {
                        return;
                    }

                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    taskFromStreamBlocker.Release();
                });

                return Disposable.Create(() => subscriber.Unsubscribe(channel));
            });
        }

为什么是信号灯?
因为我可以在流中添加很多消息,而且我不希望同一条消息被处理两次。
问题
如果我们在流中有未处理的消息,那么我们如何在没有来自pub/sub的事件的情况下进行处理当我们启动时,我们可以验证它是否是未处理的消息并对其进行处理。如果在此期间向流中添加了一条新消息,并且我们还没有订阅发布/订阅,那么在我们通过发布/订阅接收到通知之前,订阅服务器将不会处理该消息。
信号量很重要,不能处理同一条消息两次,但同时它是一个诅咒。在消息处理过程中,可以向流中添加另一个消息。当这种情况发生时,订户将不会立即处理,而只会在下次收到通知时处理(此时将处理两条消息)。
你将如何实现这一点?是否有只使用rx的redis流的实现?解决方案不应该使用某种循环,而且应该是内存有效的。这可能吗?
最美好的祝福
保罗·阿博姆·平托

vwkv1x7d

vwkv1x7d1#

这是我想避免的解决方案

private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            return Observable.Create<string>(async obs => 
            {
                while(!cancellationToken.IsCancellationRequested)
                {
                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    await Task.Delay(TimeSpan.FromMilliseconds(500));
                }

                return Disposable.Empty;
            });
        }
ymzxtsji

ymzxtsji2#

我用了一个紧环,只是做了一个X范围和保存一个位置-吻。。但如果没有工作,它就会后退,所以当有很多事情发生时,它的速度相当快。
如果你需要更高的性能,如阅读,而处理,但我会提醒对大多数情况下,这一点。
它创造了很多复杂性,这需要坚如磐石。
redis通常足够快
“我不想让同一条消息被处理两次。”几乎每个系统都至少有一次传递,消除这种崩溃是令人难以置信的困难/缓慢。您可以通过使用一组hashset的id来部分地删除它,但是对于消费者来说,处理它和设计为幂等的消息非常简单。这可能是消息设计问题的根本原因。如果对每个读卡器进行分区(单独的流和每个流一个worker),则可以将hashset保留在内存中,从而避免伸缩/分布式问题。注意redis流可以保持顺序使用它来生成更简单的幂等消息。
异常,你不想停止处理一个流,因为一个消费者在一条消息上有一个逻辑异常,比如晚上接到一个电话,整个系统都停止了,锁会使情况更糟。事件数据无法更改,因此必须尽最大努力。然而,infra/redis异常确实需要抛出并重试。在一个循环之外管理这个是非常痛苦的。
简单的背压。如果你不能足够快地处理工作,循环会变慢,而不是创建大量的任务,耗尽你所有的记忆。
我不再使用分布式锁/信号量了。
如果您处理命令(如dosomething而不是xyz)的情况发生了,这些命令可能会失败。同样,消费者应该处理已经发生的情况,而不是redis/流读取部分。
一些具有神奇回调功能的lib无法解决这些问题,在任何节点上运行超时时,回调都会重试。复杂性/问题仍然存在,它们只是移动到其他地方。
你可能有一个可观察的消费者,但这基本上是化妆品,它不能解决问题,如果你在许多实现下看某处,你会看到相同的循环。我不会用这个来代替让消费者注册一个动作。

public interface IStreamSubscriber
    {
        void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
        void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
        void Start();
    }

在您的例子中,回调可以有可观察的,而不使用循环,但是下面有一个低级循环,它还可以为使用者进行消息到对象的转换。

xkftehaa

xkftehaa3#

这是另一个使用200毫秒计时器的解决方案

private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            var instance = ThreadPoolScheduler.Instance;

            return Observable.Create<string>(obs => 
            {
                var disposable = Observable
                    .Interval(TimeSpan.FromMilliseconds(200), instance)
                    .Subscribe(async _ => 
                    {
                        var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                        foreach(var message in messages)
                        {
                            obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                            lastReadMessage = message.Id;
                        }

                        redisDb.KeyDelete($"{channel}:lastReadMessage");
                        redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                    });
                cancellationToken.Register(() => disposable.Dispose());

                return Disposable.Empty;    
            });
       }

相关问题