我使用的是elastic7.4和合流KafkaelasticsearchFlume连接器。
1) 在以下配置的汇合中创建了kana elasticsearch sink连接器
curl -XPOST -H 'Content-type:application/json' '<ip>:8083/connectors' -d '{
"name" : "sink-kafka-elasticsearch",
"config" : {
"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max" : "1",
"topics" : "kaf.newdb.col2",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"connection.url" : "<ip>:9200",
"type.name" : "test.type",
"key.ignore" : "true",
"schema.ignore" : "true"
}
}'
2) 当尝试用命令在ElasticSearch中搜索数据时
curl -XGET 'http://<ip>:9200/kaf.newdb.col2/_search?pretty'
得到这样的错误
failed to execute bulk item (index) index {[kaf.newdb.col2][test.type][kaf.newdb.col2+0+0], source["{\"_id\": {\"_data\": \"825DF0D3EF000000012B022C0100296E5A1004FBE317F12D3
84855AD2528B1E5B0581046645F696400645DF0D3EF09786CAA9967C44F0004\"},
\"operationType\": \"insert\", \"clusterTime\":
{\"$timestamp\": {\"t\": 1576063983, \"i\": 1}},
\"fullDocument\":
{\"_id\": {\"$oid\": \"5df0d3ef09786caa9967c44f\"}, \"id\": 8.0, \"SKU\": 1.0, \"item_name\": \"Hello World\", \"quantity\": 10.0}, \"ns\": {\"db\": \"newdb\", \"coll\": \"col2\"}, \"documentKey\": {\"_id\": {\"$oid\": \"5df0d3ef09786caa9967c44f\"}}}"]}
org.elasticsearch.index.mapper.MapperParsingException: failed to parse
at org.elasticsearch.index.mapper.DocumentParser.wrapInMapperParsingException(DocumentParser.java:191) ~[elasticsearch-7.4.2.jar:7.4.2]
at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:74) ~[elasticsearch-7.4.2.jar:7.4.2]
at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:267) ~[elasticsearch-7.4.2.jar:7.4.2]
有人试过这个吗?请帮助解决这个问题。
附加信息:1)通过mongo db传入kafka主题的数据2)我正在使用confluent kafka mongodb源连接器将mongo db数据源发送到kafka主题
mongo db数据
{ "_id" : ObjectId("5df0d3ef09786caa9967c44f"), "id" : 8, "SKU" : 1, "item_name" : "Hello World", "quantity" : 10 }
{ "_id" : ObjectId("5df0d40509786caa9967c450"), "id" : 10, "SKU" : 1, "item_name" : "Hello World", "quantity" : 40 }
{ "_id" : ObjectId("5df0d48b09786caa9967c451"), "id" : 10, "SKU" : 1, "item_name" : "Hello World", "quantity" : 40 }
kafka mongodb源连接器配置
curl -X PUT http://<ip>:8083/connectors/src-mongodb-newdb/config -H "Content-Type: application/json" -d '{
"tasks.max":1,
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"connection.uri":"mongodb://<uid>:<pwd>@<ip:port>",
"database":"newdb",
"collection":"col2",
"pipeline":"[{\"$match\": { \"$and\": [ { \"updateDescription.updatedFields.quantity\" : { \"$lte\": 5 } }, {\"operationType\": \"update\"}]}}]",
"topic.prefix": ""
}'
暂无答案!
目前还没有任何答案,快来回答吧!