无法将kafka主题放入XDB

rekjcdws  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(246)

我的主题模式是这样的,

{
  "schema": {
    "type": "string",
    "optional": false
  },
  "payload": "{...}"
}

我正试图将其放入具有此配置的inflox db示例中,

curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "connector.class":"io.confluent.influxdb.InfluxDBSinkConnector",
                    "tasks.max": "1",
                    "topics" : "mongo-sink-json.tractor.job",
                    "influxdb.url": "http://10.100.87.169:8086",
                    "influxdb.db": "mongo-sink",
                    "infuxdb.password": "password",
                    "influxdb.username": "user",
                    "measurement.name.format": "${topic}",
                    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                    "value.converter.schemas.enable": "true"
          }' \

但我在Kafka连接日志中发现这个错误,
error workersinktask{id=inflox\u sink\u json\u job-0}任务引发了未捕获且不可恢复的异常。任务正在终止,在手动重新启动之前不会恢复。错误:java.util.arraylist无法转换为java.util.map(org.apache.kafka.connect.runtime.workersinktask)java.lang.classcastexception:java.util.arraylist无法转换为io.confluent.influxdb.sink.writer.influxdbwriter.extractpointtags(influxdbwriter)处的java.util.map。java:288)在io.confluent.influxdb.sink.writer.influxdbwriter.getpointtags(influxdbwriter。java:266)at io.confluent.influxdb.sink.writer.influxdbwriter.getkey(influxdbwriter。java:329)at io.confluent.influxdb.sink.writer.influxdbwriter.getbatch(influxdbwriter。java:149)at io.confluent.influxdb.sink.writer.influxdbwriter.write(influxdbwriter。java:126)在io.confluent.influxdb.sink.influxdbsinktask.put(influxdbsinktask。java:40)在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:546)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:326)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:228)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask)。java:196)在org.apache.kafka.connect.runtime.workertask.dorun(workertask。java:184)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:234)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748) [2020-09-25 20:46:53,519]error workersinktask{id=influx\u sink\u json\u job-0}任务引发了未捕获且不可恢复的异常(org.apache.kafka.connect.runtime.workertask)org.apache.kafka.connect.errors.connectexception:由于不可恢复的异常而退出workersinktask。在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:568)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:326)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:228)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:196)在org.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:184)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:234)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1149)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:624)在java.lang.thread.run(线程。java:748)原因:java.lang.classcastexception:java.util.arraylist无法在处转换为java.util.mapio.confluent.influxdb.sink.writer.influxdbwriter.extractpointtags(influxdbwriter。java:288)at io.confluent.influxdb.sink.writer.influxdbwriter.getpointtags(influxdbwriter。java:266)at io.confluent.influxdb.sink.writer.influxdbwriter.getkey(influxdbwriter。java:329)在io.confluent.influxdb.sink.writer.influxdbwriter.getbatch(influxdbwriter。java:149)at io.confluent.influxdb.sink.writer.influxdbwriter.write(influxdbwriter。java:126)在io.confluent.influxdb.sink.influxdbsinktask.put(influxdbsinktask。java:40)在org.apache.kafka.connect.runtime.workersinktask.delivermessages(workersinktask。java:546)
有人有什么想法吗?我在mongodb中使用了类似的配置来接收相同的主题,它工作得非常好。
我应该使用其他XDB接收器连接器,还是使用avro序列化,或者这是无关的?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题