我的主题模式是这样的,
{
"schema": {
"type": "string",
"optional": false
},
"payload": "{...}"
}
我正试图将其放入具有此配置的inflox db示例中,
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"connector.class":"io.confluent.influxdb.InfluxDBSinkConnector",
"tasks.max": "1",
"topics" : "mongo-sink-json.tractor.job",
"influxdb.url": "http://10.100.87.169:8086",
"influxdb.db": "mongo-sink",
"infuxdb.password": "password",
"influxdb.username": "user",
"measurement.name.format": "${topic}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true"
}' \
但我在Kafka连接日志中发现这个错误,
error workersinktask{id=inflox\u sink\u json\u job-0}任务引发了未捕获且不可恢复的异常。任务正在终止,在手动重新启动之前不会恢复。错误:java.util.arraylist无法转换为java.util.map(org.apache.kafka.connect.runtime.workersinktask)java.lang.classcastexception:java.util.arraylist无法转换为io.confluent.influxdb.sink.writer.influxdbwriter.extractpointtags(influxdbwriter)处的java.util.map。java:288)在io.confluent.influxdb.sink.writer.influxdbwriter.getpointtags(influxdbwriter。java:266)at io.confluent.influxdb.sink.writer.influxdbwriter.getkey(influxdbwriter。java:329)at io.confluent.influxdb.sink.writer.influxdbwriter.getbatch(influxdbwriter。java:149)at io.confluent.influxdb.sink.writer.influxdbwriter.write(influxdbwriter。java:126)在io.confluent.influxdb.sink.influxdbsinktask.put(influxdbsinktask。java:40)在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:546)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:326)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:228)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask)。java:196)在org.apache.kafka.connect.runtime.workertask.dorun(workertask。java:184)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:234)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748) [2020-09-25 20:46:53,519]error workersinktask{id=influx\u sink\u json\u job-0}任务引发了未捕获且不可恢复的异常(org.apache.kafka.connect.runtime.workertask)org.apache.kafka.connect.errors.connectexception:由于不可恢复的异常而退出workersinktask。在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:568)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:326)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:228)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:196)在org.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:184)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:234)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)原因:java.lang.classcastexception:java.util.arraylist无法在处转换为java.util.mapio.confluent.influxdb.sink.writer.influxdbwriter.extractpointtags(influxdbwriter。java:288)at io.confluent.influxdb.sink.writer.influxdbwriter.getpointtags(influxdbwriter。java:266)at io.confluent.influxdb.sink.writer.influxdbwriter.getkey(influxdbwriter。java:329)在io.confluent.influxdb.sink.writer.influxdbwriter.getbatch(influxdbwriter。java:149)at io.confluent.influxdb.sink.writer.influxdbwriter.write(influxdbwriter。java:126)在io.confluent.influxdb.sink.influxdbsinktask.put(influxdbsinktask。java:40)在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:546)
有人有什么想法吗?我在mongodb中使用了类似的配置来接收相同的主题,它工作得非常好。
我应该使用其他XDB接收器连接器,还是使用avro序列化,或者这是无关的?
暂无答案!
目前还没有任何答案,快来回答吧!