对于Kafka mongo sink连接器,我希望Map现有相同对象ID

t3irkdon  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(117)

我正在为mongodb使用Kafka sink连接器。这里我想把一些json文档从kafka主题推到mongodb,但是我在文档中使用$oid时遇到了错误。
以下是错误:

{"name":"mongodb-sink-connector","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"localhost:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to write mongodb documents\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.bulkWriteBatch(MongoSinkTask.java:227)\n\tat java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:122)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)\n\t... 10 more\nCaused by: java.lang.IllegalArgumentException: Invalid BSON field name $oid\n\tat org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:534)\n\tat com.mongodb.internal.connection.BsonWriterDecorator.writeName(BsonWriterDecorator.java:193)\n\tat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:117)\n\tat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)\n\tat org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)\n\tat org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139)\n\tat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118)\n\tat org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)\n\tat com.mongodb.internal.connection.SplittablePayload$WriteRequestEncoder.encode(SplittablePayload.java:221)\n\tat com.mongodb.internal.connection.SplittablePayload$WriteRequestEncoder.encode(SplittablePayload.java:187)\n\tat org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)\n\tat org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)\n\tat com.mongodb.internal.connection.BsonWriterHelper.writeDocument(BsonWriterHelper.java:77)\n\tat com.mongodb.internal.connection.BsonWriterHelper.writePayload(BsonWriterHelper.java:59)\n\tat com.mongodb.internal.connection.CommandMessage.encodeMessageBodyWithMetadata(CommandMessage.java:162)\n\tat com.mongodb.internal.connection.RequestMessage.encode(RequestMessage.java:138)\n\tat com.mongodb.internal.connection.CommandMessage.encode(CommandMessage.java:59)\n\tat com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:268)\n\tat com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:100)\n\tat com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:490)\n\tat com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)\n\tat com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:253)\n\tat com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:202)\n\tat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:118)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.executeCommand(MixedBulkWriteOperation.java:431)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.executeBulkWriteBatch(MixedBulkWriteOperation.java:251)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.access$700(MixedBulkWriteOperation.java:76)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:194)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:185)\n\tat com.mongodb.internal.operation.OperationHelper.withReleasableConnection(OperationHelper.java:621)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:185)\n\tat com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:76)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:187)\n\tat com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:442)\n\tat com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:422)\n\tat com.mongodb.kafka.connect.sink.MongoSinkTask.bulkWriteBatch(MongoSinkTask.java:209)\n\t... 13 more\n"}],"type":"sink"}

下面是我在Kafka的文稿中插入的题目:
{"_id": {"$oid": "634fd99b52281517a468f3a7"},"schema": {"type": "struct", "fields": [{"type": "int32","optional": true, "field": "id"}, {"type": "string", "optional": true, "field": "name"}, {"type": "string", "optional": true, "field": "middel_name"}, {"type": "string", "optional": true, "field": "surname"}],"optional": false, "name": "foobar"},"payload": {"id":45,"name":"mongo","middle_name": "mmp","surname": "kafka"}}以下是我使用过的连接器设置:

{
  "name": "mongodb-sink-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "migration-mongo",
    "connection.uri": "mongodb://abc:xyz@xx.xx.xx.01:27018,xx.xx.xx.02:27018,xx.xx.xx.03:27018/?authSource=admin&replicaSet=dev",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "document.id.strategy.overwrite.existing": "false",
    "validate.non.null": false,
    "database": "foo",
    "collection": "product"
  }
}
d7v8vwbk

d7v8vwbk1#

Kafka Connect JSONConverter有效负载应该 * 只 * 有schemapayload字段,而不是_id。并且您需要"value.converter.schemas.enable": "true"。如果您将其设置为false,您可以删除schemapayload,并将_id直接放在有效负载中...
Mongo客户端使用的ID通常与Kafka * 记录键 * 本身相关联,而不是与您所显示的值部分中嵌入的任何值相关联,但这取决于ID Strategy

相关问题