如果我使用kafka connect来使用消息并存储到s3(使用kafka connect s3连接器),那么我是否可以将消息偏移量与事件负载一起存储?我想有这个数据,使一些秩序的信息,也可以检查是否有任何差距,或检查是否有任何重复的消息,我已经收到(e、 g.如果我的消费者补偿意外地被击倒,我重新启动了Kafka连接)。这是可能的还是我应该为这类功能编写一个自定义订阅服务器?
8dtrkrch1#
根据有关插入字段转换的文档,可以使用 offset.field :
offset.field
Name Description offset.field Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).
总的来说,单消息转换(smt)配置如下所示:
"transforms": "InsertField", "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertField.offset.field": "offsetColumn"
如果这不是你想要的,那么你总是可以选择创建你的定制转换
1条答案
按热度按时间8dtrkrch1#
根据有关插入字段转换的文档,可以使用
offset.field
:总的来说,单消息转换(smt)配置如下所示:
如果这不是你想要的,那么你总是可以选择创建你的定制转换