使用kafka elasticsearch接收器连接器索引elasticsearch中的kafka主题时发生mapperparsingexception错误

xkftehaa  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(190)

我使用的是elastic7.4和合流KafkaelasticsearchFlume连接器。
1) 在以下配置的汇合中创建了kana elasticsearch sink连接器

curl -XPOST -H 'Content-type:application/json' '<ip>:8083/connectors' -d '{
 "name" : "sink-kafka-elasticsearch",
 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics" : "kaf.newdb.col2", 
  "key.converter":"org.apache.kafka.connect.storage.StringConverter",
  "value.converter":"org.apache.kafka.connect.storage.StringConverter",  
  "connection.url" : "<ip>:9200",
  "type.name" : "test.type",
  "key.ignore" : "true",
  "schema.ignore" : "true"
 }
}'

2) 当尝试用命令在ElasticSearch中搜索数据时

curl -XGET 'http://<ip>:9200/kaf.newdb.col2/_search?pretty'

得到这样的错误

failed to execute bulk item (index) index {[kaf.newdb.col2][test.type][kaf.newdb.col2+0+0], source["{\"_id\": {\"_data\": \"825DF0D3EF000000012B022C0100296E5A1004FBE317F12D3
84855AD2528B1E5B0581046645F696400645DF0D3EF09786CAA9967C44F0004\"}, 
\"operationType\": \"insert\", \"clusterTime\": 
{\"$timestamp\": {\"t\": 1576063983, \"i\": 1}}, 
\"fullDocument\": 
{\"_id\": {\"$oid\": \"5df0d3ef09786caa9967c44f\"}, \"id\": 8.0, \"SKU\": 1.0, \"item_name\": \"Hello World\", \"quantity\": 10.0}, \"ns\": {\"db\": \"newdb\", \"coll\": \"col2\"}, \"documentKey\": {\"_id\": {\"$oid\": \"5df0d3ef09786caa9967c44f\"}}}"]}

org.elasticsearch.index.mapper.MapperParsingException: failed to parse
        at org.elasticsearch.index.mapper.DocumentParser.wrapInMapperParsingException(DocumentParser.java:191) ~[elasticsearch-7.4.2.jar:7.4.2]
        at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:74) ~[elasticsearch-7.4.2.jar:7.4.2]
        at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:267) ~[elasticsearch-7.4.2.jar:7.4.2]

有人试过这个吗?请帮助解决这个问题。
附加信息:1)通过mongo db传入kafka主题的数据2)我正在使用confluent kafka mongodb源连接器将mongo db数据源发送到kafka主题
mongo db数据

{ "_id" : ObjectId("5df0d3ef09786caa9967c44f"), "id" : 8, "SKU" : 1, "item_name" : "Hello World", "quantity" : 10 }
{ "_id" : ObjectId("5df0d40509786caa9967c450"), "id" : 10, "SKU" : 1, "item_name" : "Hello World", "quantity" : 40 }
{ "_id" : ObjectId("5df0d48b09786caa9967c451"), "id" : 10, "SKU" : 1, "item_name" : "Hello World", "quantity" : 40 }

kafka mongodb源连接器配置

curl -X PUT http://<ip>:8083/connectors/src-mongodb-newdb/config -H "Content-Type: application/json" -d '{
      "tasks.max":1,
      "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "connection.uri":"mongodb://<uid>:<pwd>@<ip:port>",
      "database":"newdb",
      "collection":"col2",
      "pipeline":"[{\"$match\": { \"$and\": [ { \"updateDescription.updatedFields.quantity\" : { \"$lte\": 5 } }, {\"operationType\": \"update\"}]}}]", 
      "topic.prefix": ""
}'

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题