topic.key.ignore in kafka elasticsearch connect不工作?

vatpfxk5  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(352)

在kafka elasticsearch中,connect可以选择修改elasticsearch中的文档id。为此,我们可以将key.ignore标记为false,这是一个全局设置;对于没有键的主题,可以在topic.key.ignore下标记,这将为该特定主题设置key.ignore为true。我有两个主题一是filebeat数据和metricbeat数据。对于文件beat,我需要使用key作为文档id,不需要metricbeat。下面是日志中topic.key.ignore具有值metricbeat的部分

auto.create.indices.at.start = false
batch.size = 2000
behavior.on.malformed.documents = fail
behavior.on.null.values = ignore
compact.map.entries = true
connection.compression = false
connection.password = null
connection.timeout.ms = 1000
connection.url = [http://10.0.14.6:9200]
connection.username = null
drop.invalid.message = false
elastic.https.ssl.cipher.suites = null
elastic.https.ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
elastic.https.ssl.endpoint.identification.algorithm = https
elastic.https.ssl.engine.factory.class = null
elastic.https.ssl.key.password = null
elastic.https.ssl.keymanager.algorithm = SunX509
elastic.https.ssl.keystore.location = null
elastic.https.ssl.keystore.password = null
elastic.https.ssl.keystore.type = JKS
elastic.https.ssl.protocol = TLSv1.3
elastic.https.ssl.provider = null
elastic.https.ssl.secure.random.implementation = null
elastic.https.ssl.trustmanager.algorithm = PKIX
elastic.https.ssl.truststore.location = null
elastic.https.ssl.truststore.password = null
elastic.https.ssl.truststore.type = JKS
elastic.security.protocol = PLAINTEXT
flush.timeout.ms = 10000
key.ignore = false
linger.ms = 1
max.buffered.records = 20000
max.connection.idle.time.ms = 60000
max.in.flight.requests = 5
max.retries = 5
proxy.host = 
proxy.password = null
proxy.port = 8080
proxy.username = 
read.timeout.ms = 3000
retry.backoff.ms = 100
schema.ignore = true
topic.index.map = []

***topic.key.ignore = ['metricbeat']***

topic.schema.ignore = []
type.name = doc
write.method = insert

但仍然得到了错误

2020-11-23 04:55:57,920] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Key is used as document id and can not be null. (org.apache.kafka.connect.runtime.WorkerSinkTask:586)
org.apache.kafka.connect.errors.ConnectException: Key is used as document id and can not be null.
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:82)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:182)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:299)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:284)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:138)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-23 04:55:57,922] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Key is used as document id and can not be null.
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:82)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:182)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:299)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:284)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:138)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    ... 10 more
[2020-11-23 04:55:57,922] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)

这是配置,如果需要的话。

ame=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=upi-session,metricbeat
auto.create.indices.at.start=false
key.ignore=false
topic.key.ignore='metricbeat'
connection.url=http://10.0.14.6:9200
type.name=doc
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schema.ignore=true

这个问题我已经检查过了,但没有帮助,这迫使我问了一个新问题:-Kafka连接主题.key.ignore没有按预期工作

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题