我有一个Kafka的主题 HrEvents
,其中包含大量 Hire
, Fire
, Quit
, Promotion
以及 Demotion
信息。每个hr事件消息都有一个 employee_id
属性(也是用于分区的键)和 data
属性,该属性可能包含有关hr事件的任意详细信息。
问题是 data
我的应用程序需要能够处理的blob没有很好的文档记录,并且有可能在任何时候,应用程序无法处理的hr事件被消耗。
重要的是-对于每个 employee_id
-应用程序按顺序处理所有hr事件。同样重要的是,在这种处理失败之后 employee_id
,所有其他人力资源事件 employee_id
我们可以继续。
失败的hr事件,以及该事件的所有后续hr事件 employee_id
应该发布到死信队列。一旦应用程序被修补-并支持另一种未记录的 data
添加了blob-这些hr事件可以从死信队列中使用。
我意识到这也需要在消费者中维护某种形式的关键黑名单,其中包括 employee_id
存储死信队列中至少有一个未使用的hr事件消息的。
是否有现有的解决方案/java库允许我实现这个问题的解决方案?
请原谅我的无知,但我正试图找到解决上述问题的方法,但我怀疑我可能没有用正确的术语进行搜索。随便教我吧。
1条答案
按热度按时间6ljaweal1#
听起来你应该可以利用Kafka流。
你的死信队列可以建立一个ktable,形成一种黑名单。当新事件出现在原始主题中时,您需要对现有id的ktable执行查找,并将传入事件附加到该id尚未处理的事件的值列表中