我正在使用从kafka到mongo的mongokafka sink连接器,连接器jar是mongo-kafka-connect-1.7.0-all.jar。我还在连接集群的插件路径中添加了mongodb-driver-core-4.5.0.jar。我正在使用strimzi kafka和连接。在kafka中,我可以成功发送消息,但在连接器中,我可以看到错误。
curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongoservice:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"q4.s4"
}
}
' \
http://localhost:8083/connectors -w "\n"
示例消息:
{"id": 1, "name": "hello"}
连接器错误:{"name":"mongo-sink","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 13 more\n"}],"type":"sink"}
我尝试在连接器配置中设置“schemas.enable”:“false”,但仍然出现相同的问题。
我是否需要启动任何其他服务,如schema registry?目前我使用的是strimzi Kafka,并连接到mongodbv5.0.3和mongo-kafka-connect-1.7.0-all.jar,mongodb-driver-core-4.5.0.jar。
2条答案
按热度按时间vltsax251#
我尝试在连接器配置中设置"schemas. enable":"false",但仍然出现相同的问题。
错误是针对转换器,而不是连接器。该配置不是有效的Mongo连接器属性。
由于您只对记录的值感兴趣,因此设置
"value.converter.schemas.enable" : "false"
我是否需要启动任何其他服务,如schema registry
如果您使用
kafka-avro-console-producer
或类似工具发送记录,则是。目前我正在使用strimziKafka
您可以使用
KafkaConnector
CRD,而不是直接调用Connect API。lxkprmvk2#
这可能不是直接的答案,但可能是有帮助的。在我的例子“Confluent S3 Sink Connector”中,请求参数中的配置缓存在主题的配置中。所以,我不能在不删除该主题的情况下覆盖配置值。这不是写在Confluent文档中的...