kafka连接mongodb到elasticsearch的流数据

w8rqjzmb  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(309)

我正在尝试使用kafka connect将数据从mongodb流式传输到elasticsearch。
通过mongodb连接器流到kafka的数据如下所示

{
   "updatedAt" : {
      "$date" : 1591596275939
   },
   "createdAt" : {
      "$date" : 1362162600000
   },
   "name" : "my name",
   "_id" : {
      "$oid" : "5ee0cc7e0c3273f3d4a3c20f"
   },
   "documentId" : "mydoc1",
   "age" : 20,
   "language" : "English",
   "validFrom" : {
      "$date" : 978307200000
   },
   "remarks" : [
      "remarks"
   ],
   "married" : false
}

在将数据保存到elasticsearch时,我遇到以下两个问题
_id是一个对象,我想在elasticsearch中使用“documentid”键作为\u id
日期是一个带有$date键的对象,我不知道如何将其转换为普通日期。
关于以上两个问题,谁能给我指出正确的方向。
mongodb源配置

{
   "tasks.max" : "5",
   "change.stream.full.document" : "updateLookup",
   "name" : "mongodb-source",
   "value.converter" : "org.apache.kafka.connect.storage.StringConverter",
   "collection" : "collection",
   "poll.max.batch.size" : "1000",
   "connector.class" : "com.mongodb.kafka.connect.MongoSourceConnector",
   "batch.size" : "1000",
   "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
   "key.converter.schemas.enable":"false",
   "value.converter.schemas.enable":"false",
   "connection.uri" : "mongodb://connection",
   "publish.full.document.only" : "true",
   "database" : "databasename",
   "poll.await.time.ms" : "5000",
   "topic.prefix" : "mongodb"
}

弹性Flume配置

{
   "write.method" : "upsert",
   "errors.deadletterqueue.context.headers.enable" : "true",
   "name" : "elasticsearch-sink",
   "connection.password" : "password",
   "topic.index.map" : "mongodb.databasename.collection:elasticindexname",
   "connection.url" : "http://localhost:9200",
   "errors.log.enable" : "true",
   "flush.timeout.ms" : "20000",
   "errors.log.include.messages" : "true",
   "key.ignore" : "false",
   "type.name" : "_doc",
   "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
   "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
   "key.converter.schemas.enable":"false",
   "value.converter.schemas.enable":"false",
   "tasks.max" : "1",
   "batch.size" : "100",
   "schema.ignore" : "true",
   "schema.enable" : "false",
   "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
   "read.timeout.ms" : "6000",
   "connection.username" : "elastic",
   "topics" : "mongodb.databasename.collection",
   "proxy.host": "localhost",
   "proxy.port": "8080"
}

例外

Caused by: org.apache.kafka.connect.errors.DataException: MAP is not supported as the document id.
        at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:107)
        at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:182)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:291)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:276)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:174)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        ... 10 more

连接器链接:

https://docs.mongodb.com/kafka-connector/master/kafka-source/
https://docs.confluent.io/current/connect/kafka-connect-elasticsearch

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题