在kafka elasticsearch中,connect可以选择修改elasticsearch中的文档id。为此,我们可以将key.ignore标记为false,这是一个全局设置;对于没有键的主题,可以在topic.key.ignore下标记,这将为该特定主题设置key.ignore为true。我有两个主题一是filebeat数据和metricbeat数据。对于文件beat,我需要使用key作为文档id,不需要metricbeat。下面是日志中topic.key.ignore具有值metricbeat的部分
auto.create.indices.at.start = false
batch.size = 2000
behavior.on.malformed.documents = fail
behavior.on.null.values = ignore
compact.map.entries = true
connection.compression = false
connection.password = null
connection.timeout.ms = 1000
connection.url = [http://10.0.14.6:9200]
connection.username = null
drop.invalid.message = false
elastic.https.ssl.cipher.suites = null
elastic.https.ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
elastic.https.ssl.endpoint.identification.algorithm = https
elastic.https.ssl.engine.factory.class = null
elastic.https.ssl.key.password = null
elastic.https.ssl.keymanager.algorithm = SunX509
elastic.https.ssl.keystore.location = null
elastic.https.ssl.keystore.password = null
elastic.https.ssl.keystore.type = JKS
elastic.https.ssl.protocol = TLSv1.3
elastic.https.ssl.provider = null
elastic.https.ssl.secure.random.implementation = null
elastic.https.ssl.trustmanager.algorithm = PKIX
elastic.https.ssl.truststore.location = null
elastic.https.ssl.truststore.password = null
elastic.https.ssl.truststore.type = JKS
elastic.security.protocol = PLAINTEXT
flush.timeout.ms = 10000
key.ignore = false
linger.ms = 1
max.buffered.records = 20000
max.connection.idle.time.ms = 60000
max.in.flight.requests = 5
max.retries = 5
proxy.host =
proxy.password = null
proxy.port = 8080
proxy.username =
read.timeout.ms = 3000
retry.backoff.ms = 100
schema.ignore = true
topic.index.map = []
***topic.key.ignore = ['metricbeat']***
topic.schema.ignore = []
type.name = doc
write.method = insert
但仍然得到了错误
2020-11-23 04:55:57,920] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Key is used as document id and can not be null. (org.apache.kafka.connect.runtime.WorkerSinkTask:586)
org.apache.kafka.connect.errors.ConnectException: Key is used as document id and can not be null.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:82)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:182)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:299)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:284)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:138)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-23 04:55:57,922] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Key is used as document id and can not be null.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:82)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:182)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:299)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:284)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:138)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
... 10 more
[2020-11-23 04:55:57,922] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)
这是配置,如果需要的话。
ame=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=upi-session,metricbeat
auto.create.indices.at.start=false
key.ignore=false
topic.key.ignore='metricbeat'
connection.url=http://10.0.14.6:9200
type.name=doc
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schema.ignore=true
这个问题我已经检查过了,但没有帮助,这迫使我问了一个新问题:-Kafka连接主题.key.ignore没有按预期工作
暂无答案!
目前还没有任何答案,快来回答吧!