在kafka elasticsearch连接器中使用elasticsearch生成的id

jjhzyzn0  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(431)

我注意到使用kafka elasticsearch连接器在elasticsearch中索引的文档的id格式如下 topic+partition+offset .
我更喜欢使用elasticsearch生成的id。看来 topic+partition+offset 通常不是唯一的,所以我丢失了数据。
我怎样才能改变这一点?

p1iqtdky

p1iqtdky1#

@罗宾·莫法特,在我看来, topic-partition-offset 在升级kafka集群的情况下可能会导致重复,但不是以滚动升级的方式,而是用集群替换集群(有时更容易替换)。在这种情况下,您将遇到由于覆盖数据而导致的数据丢失。
关于您的优秀示例,这可能是许多情况下的解决方案,但我要添加另一个选项。也许可以将epoc timestamp元素添加到 topic-partition-offset 所以就这样 topic-partition-offset-current_timestamp .
你怎么认为?

llmtgqce

llmtgqce2#

正如菲尔在评论中所说的—— topic-partition-offset 应该是唯一的,所以我看不出这是如何导致数据丢失。
不管怎样-您可以让连接器生成密钥(就像您正在做的那样),也可以自己定义密钥( key.ignore=false ). 别无选择。
您可以使用kafka connect的单消息转换从数据中的字段派生密钥。根据你在elasticsearch论坛上的信息,似乎有 id 在您的数据中-如果这是唯一的,您可以将其设置为您的密钥,因此也可以将其设置为您的elasticsearch文档id。下面是使用smt定义密钥的示例:


# Add the `id` field as the key using Simple Message Transformations

transforms=InsertKey, ExtractId

# `ValueToKey`: push an object of one of the column fields (`id`) into the key

transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id

# `ExtractField`: convert key from an object to a plain field

transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id

(通过https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/)

相关问题