kafkaconnect elasticsearch文档id创建

nkcskrwz  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(463)

我当前正在使用以下连接器配置,并得到异常“密钥用作文档id,不能为空”

{

    "name" :"hello7",
    "config" : {
        "name": "hello7",
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "hello7",
        "connection.url":"http://127.0.0.1:8080/",
        "type.name":"aggregator",
        "schema.ignore": "true",
        "topic.schema.ignore": "true",
        "topic.key.ignore": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false", 
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "key.ignore":"false",
        "transforms": "extractKey",
        "transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.InsertKey.fields":"customerId",
        "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKey.field":"customerId",
        "errors.log.enable":true,
        "errors.log.include.messages":true

     }
}

我将向主题发送以下信息

{

  "customerId" : "i7y32o4823",
  "customerName" : "JOE",
  "address":"123 main street",
  "employee" : "ABC Company",
  "employeeAddress" : "178 Main Street"

}

我得到以下错误
2020-01-17 16:28:33624]error workersinktask{id=hello7-0}任务引发了未捕获且不可恢复的异常。任务正在终止,在手动重新启动之前不会恢复(org.apache.kafka.connect.runtime.workersinktask)org.apache.kafka.connect.errors.connectexception:密钥用作文档id,不能为空。at io.confluent.connect.elasticsearch.dataconverter.convertkey(dataconverter。java:79)在io.confluent.connect.elasticsearch.dataconverter.convertrecord(dataconverter。java:160)at io.confluent.connect.elasticsearch.elasticsearchwriter.trywriterecord(elasticsearchwriter。java:285)在io.confluent.connect.elasticsearch.elasticsearchwriter.write(elasticsearchwriter。java:270)at io.confluent.connect.elasticsearch.elasticsearchsinktask.put(elasticsearchsinktask。java:169)

ntjbwcob

ntjbwcob1#

你已经准备好了 "key.ignore":"false" 只提到了你对Kafka主题的评价。
Kafka记录既有键也有值。如果不指定键,它将为空。
正如错误所说,elasticsearch接收器连接器不接受空键
connectexception:密钥用作文档id,不能为空
此外,您只提取变换中的键,而不使用 InsertKey ```
"transforms": "extractKey",

可以使用到控制台的独立filesinkconnector输出调试连接器

name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=hello7

相关问题