# Add the `id` field as the key using Simple Message Transformations
transforms=InsertKey, ExtractId
# `ValueToKey`: push an object of one of the column fields (`id`) into the key
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
# `ExtractField`: convert key from an object to a plain field
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
2条答案
按热度按时间p1iqtdky1#
@罗宾·莫法特,在我看来,
topic-partition-offset
在升级kafka集群的情况下可能会导致重复,但不是以滚动升级的方式,而是用集群替换集群(有时更容易替换)。在这种情况下,您将遇到由于覆盖数据而导致的数据丢失。关于您的优秀示例,这可能是许多情况下的解决方案,但我要添加另一个选项。也许可以将epoc timestamp元素添加到
topic-partition-offset
所以就这样topic-partition-offset-current_timestamp
.你怎么认为?
llmtgqce2#
正如菲尔在评论中所说的——
topic-partition-offset
应该是唯一的,所以我看不出这是如何导致数据丢失。不管怎样-您可以让连接器生成密钥(就像您正在做的那样),也可以自己定义密钥(
key.ignore=false
). 别无选择。您可以使用kafka connect的单消息转换从数据中的字段派生密钥。根据你在elasticsearch论坛上的信息,似乎有
id
在您的数据中-如果这是唯一的,您可以将其设置为您的密钥,因此也可以将其设置为您的elasticsearch文档id。下面是使用smt定义密钥的示例:(通过https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/)