confluent kafka connect elasticsearch id文档创建

mitkmikd  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(651)

我正在使用kafka connect elasticsearch连接器将主题中的数据写入elasticsearch索引。主题消息的键和值都是json格式的。由于以下错误,连接器无法启动:

org.apache.kafka.connect.errors.DataException: MAP is not supported as the document id.

以下是我的消息格式(key | value):

{"key":"OKOK","start":1517241690000,"end":1517241695000}     |   {"measurement":"responses","count":9,"sum":1350.0,"max":150.0,"min":150.0,"avg":150.0}

下面是我用来创建连接器的post请求的主体:

{
 "name": "elasticsearch-sink-connector",
 "config": {
 "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
 "tasks.max": "1",
 "topics": "output-topic-elastic",
 "connection.url": "http://elasticsearch:9200",
 "type.name": "aggregator",
 "schemas.enable": "false",
 "topic.schema.ignore": "true",
 "topic.key.ignore": "false",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false", 
 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable": "false", 
 "key.ignore":"false",
 "topic.index.map": "output-topic-elastic:aggregator",
 "name": "elasticsearch-sink",
 "transforms": "InsertKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"key"
}}

任何帮助都将不胜感激。我在StackOverflow1上发现了一个类似的问题,但我没有找到答案。
es文档id创建

bttbmeg0

bttbmeg01#

你还需要 ExtractField 在那里

"transforms": "InsertKey,extractKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"key",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"key"

查看此帖子了解更多详细信息。

相关问题