kafka表中聚合记录超时?

wfsdck30  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(344)

我用Kafka来处理信息。消息可以分为几个部分(它是一个复合消息)。所以在流中,我可以有一个复合消息,它被分成三部分。换句话说,Kafka流中有三条记录,但这是一条重要的信息。我想使用kafka表合并一个kafka记录中的复合消息部分。合并后,一条消息将插入数据库(postgres)。每个零件都有零件号和零件总数。例如,如果我在流中有一条消息的三个部分(三个Kafka记录),那么每个部分都有值为3的部分总数字段。
我的理解是,在积极的场景中,任务很简单:在表中聚合部分,从表中创建流,并过滤具有相同聚合部分大小和部分总数的记录,在一个合并消息中Map过滤并将其插入数据库(postgres)。
但消极的情况也是可能的。在极少数情况下,其中一个部分根本不能插入Kafka(或者它将在很长时间后,在超时后插入)。例如,在流中,一条复合消息的三个部分中只有两个部分会出现。在这种情况下,我必须在数据库(postgres)中插入未完全构造的消息(它将只包含两部分,而不是三部分)。我怎样才能在Kafka身上实现这种消极情景?

mi7gmzs6

mi7gmzs61#

我建议检查一下标点符号:https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a流处理器
另外请注意,您可以混合和匹配处理器api和dsl:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#applying-处理器和转换器处理器api集成
如果为ktable聚合提供存储名称,则可以将存储连接到注册标点符号的自定义处理器。总的来说,对整个应用程序使用处理器api可能比dsl更好。

相关问题