Kafka连接消费者引用偏移量并存储在消息中

3wabscal  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(278)

如果我使用kafka connect来使用消息并存储到s3(使用kafka connect s3连接器),那么我是否可以将消息偏移量与事件负载一起存储?我想有这个数据,使一些秩序的信息,也可以检查是否有任何差距,或检查是否有任何重复的消息,我已经收到(e、 g.如果我的消费者补偿意外地被击倒,我重新启动了Kafka连接)。这是可能的还是我应该为这类功能编写一个自定义订阅服务器?

8dtrkrch

8dtrkrch1#

根据有关插入字段转换的文档,可以使用 offset.field :

Name            Description
offset.field    Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).

总的来说,单消息转换(smt)配置如下所示:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "offsetColumn"

如果这不是你想要的,那么你总是可以选择创建你的定制转换

相关问题