延迟Kafka主题中的一些记录

fzwojiic  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(366)

我有一个消费项目,它使用Kafka主题的数据。该流中90%的数据可以实时处理,但对于特定记录(~10%),我需要延迟处理。
我应该在同一个jvm中有两个独立的使用者,在一个使用者中使用90%的记录,而忽略这10%的记录,让另一个使用者来处理它,还是将10%的消息推送到另一个主题并延迟对另一个主题的处理?
如果我可以有一个消费者和两个检查点机制,一个占90%,另一个延迟10%,那就太好了,但是kafka客户机似乎不支持这个用例。它将帮助我避免任何不必要的反序列化和网络io。

mfuanj7w

mfuanj7w1#

不可能为单个使用者设置多个检查点—或者使用两个使用者或者两个主题。
两个消费者的一个问题是,所有消息都会被读取两次,而不是在完全相同的时间,这就造成了一个问题,即要知道消息何时是历史消息,何时不是历史消息:如果一个消费者在23:59:59读取了今天发布的消息,而另一个消费者在00:00:01读取了该消息,会发生什么情况(与相对时间相同的问题)。您可以使用一些滞后和监控滞后来防止这个问题。
将数据分为两个主题。您可以使用Kafka流或任何其他流处理工具。例如你的 events 主题将被处理并分为两个主题 historical-events 以及 realtime-events . 您仍然会有两个消费者,但主题不同。正如你所提议的,你也可以只消耗你的能量 events 主题,处理即时数据并将历史数据发送到另一个主题(因此您有两个主题而不是三个主题,提交的偏移量没有问题)-但这意味着流程客户机需要更多的io,客户机需要承担两个责任
由于只有一个用户在阅读基本主题中的每条消息,因此它将始终是最新的或历史的,因此不会出现上一期。
只有一个消费者会相应地处理消息,但正如您所指出的,它与偏移提交有关,并且在给定历史批次的情况下可能会使用大量ram。关于偏移提交,您可以简单地存储(在另一个kafka压缩主题中,方法与\u consumer\u offset相同)有关历史或当前偏移的最后一个偏移,重新启动时,从历史批恢复,并忽略所有“最近”的数据,直到达到正确的偏移。这是可能的,但使用更多的内存,而且方式更麻烦。
你的选择在很大程度上取决于什么对你来说是有问题的(io,ram,仅仅是有一个正确的行为)。从一开始就将两个主题分开可能是最容易实现的,可以在单独的流程中完成,有效地将每个流程的责任分开,并将对处理客户机的影响降至最低。

相关问题