我正试图在kafka(apachebeam)中精确配置once语义。以下是我将要介绍的变化:
制作人: enable.idenpotence
=正确 transactional.id
=唯一事务ID
消费者:
套 enable.auto.commit
=错误
//在consumer builder中添加了以下内容:
.commitOffsetsInFinalize() .withReadCommitted()
将以下内容添加到 KafkaIO#write
建设者: .withEOS(numShards, sinkGroupId)
有没有人知道还有什么应该改变,以实现apachebeam kafkaio中的一次语义?
上面的配置看起来不错还是我误解了smth?
我需要指定 transactional.id
属性(因为我在apachebeam中没有显式生产者)?
1条答案
按热度按时间mqxuamgl1#
好吧,看来我终于找到了适合我要求的合适设置。以下是我的结论:
KafkaIO.Read
:使用更新使用者属性
enable.auto.commit = false
.withReadCommitted().commitOffsetsInFinalize()
2)KafkaIO#write
:.withEOS(numShards, sinkGroupId)
它还将启用幂等性并设置transactional.id
在引擎盖下给制作人。因此,在这样的设置下,我们将在读取时至少有一次语义,在写入时正好有一次语义。