因为我找不到任何不使用循环来获取流内容的实现,所以我开始实现一个,但我面临着几个问题,也许你们中的一些人可以给我指出正确的位置。
实现使用pub/sub和流的组合:log->stream channellog:notification ->发布/订阅*log:lastreadmessage ->包含流中的最后一个读取密钥
出版商
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
while(true)
{
var value = new NameValueEntry[]
{
new NameValueEntry("id", Guid.NewGuid().ToString()),
new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
};
redisDb.StreamAdd("log", value);
var publisher = connectionMultiplexer.GetSubscriber();
publisher.Publish("log:notify", string.Empty, CommandFlags.None);
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
用户
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
var observableStream = CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
.Subscribe(x => {
Console.WriteLine(x);
});
Console.ReadLine();
}
private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(obs =>
{
var subscriber = connection.GetSubscriber();
subscriber.Subscribe($"{channel}:notify", async (ch, msg) =>
{
var locker = await taskFromStreamBlocker
.WaitAsync(0)
.ConfigureAwait(false);
if (!locker)
{
return;
}
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
taskFromStreamBlocker.Release();
});
return Disposable.Create(() => subscriber.Unsubscribe(channel));
});
}
为什么是信号灯?
因为我可以在流中添加很多消息,而且我不希望同一条消息被处理两次。
问题
如果我们在流中有未处理的消息,那么我们如何在没有来自pub/sub的事件的情况下进行处理当我们启动时,我们可以验证它是否是未处理的消息并对其进行处理。如果在此期间向流中添加了一条新消息,并且我们还没有订阅发布/订阅,那么在我们通过发布/订阅接收到通知之前,订阅服务器将不会处理该消息。
信号量很重要,不能处理同一条消息两次,但同时它是一个诅咒。在消息处理过程中,可以向流中添加另一个消息。当这种情况发生时,订户将不会立即处理,而只会在下次收到通知时处理(此时将处理两条消息)。
你将如何实现这一点?是否有只使用rx的redis流的实现?解决方案不应该使用某种循环,而且应该是内存有效的。这可能吗?
最美好的祝福
保罗·阿博姆·平托
3条答案
按热度按时间vwkv1x7d1#
这是我想避免的解决方案
ymzxtsji2#
我用了一个紧环,只是做了一个X范围和保存一个位置-吻。。但如果没有工作,它就会后退,所以当有很多事情发生时,它的速度相当快。
如果你需要更高的性能,如阅读,而处理,但我会提醒对大多数情况下,这一点。
它创造了很多复杂性,这需要坚如磐石。
redis通常足够快
“我不想让同一条消息被处理两次。”几乎每个系统都至少有一次传递,消除这种崩溃是令人难以置信的困难/缓慢。您可以通过使用一组hashset的id来部分地删除它,但是对于消费者来说,处理它和设计为幂等的消息非常简单。这可能是消息设计问题的根本原因。如果对每个读卡器进行分区(单独的流和每个流一个worker),则可以将hashset保留在内存中,从而避免伸缩/分布式问题。注意redis流可以保持顺序使用它来生成更简单的幂等消息。
异常,你不想停止处理一个流,因为一个消费者在一条消息上有一个逻辑异常,比如晚上接到一个电话,整个系统都停止了,锁会使情况更糟。事件数据无法更改,因此必须尽最大努力。然而,infra/redis异常确实需要抛出并重试。在一个循环之外管理这个是非常痛苦的。
简单的背压。如果你不能足够快地处理工作,循环会变慢,而不是创建大量的任务,耗尽你所有的记忆。
我不再使用分布式锁/信号量了。
如果您处理命令(如dosomething而不是xyz)的情况发生了,这些命令可能会失败。同样,消费者应该处理已经发生的情况,而不是redis/流读取部分。
一些具有神奇回调功能的lib无法解决这些问题,在任何节点上运行超时时,回调都会重试。复杂性/问题仍然存在,它们只是移动到其他地方。
你可能有一个可观察的消费者,但这基本上是化妆品,它不能解决问题,如果你在许多实现下看某处,你会看到相同的循环。我不会用这个来代替让消费者注册一个动作。
如
在您的例子中,回调可以有可观察的,而不使用循环,但是下面有一个低级循环,它还可以为使用者进行消息到对象的转换。
xkftehaa3#
这是另一个使用200毫秒计时器的解决方案