我正在尝试设置一个测试,将数据从mysql移动到elasticsearch。
我有一个停靠设置,包括broker、zookeeper、connect、ksql服务器和cli、schema registry和elasticsearch。我使用的是来自ConfluentVersion5.1.0的docker图像,对于elasticsearch,我使用的是elasticsearch:6.5.4
我配置了一个jdbc连接器从mysql获取数据到kafka,这是工作的我看到我的主题创建和使用ksqlcli我可以看到流中的新消息,因为我更新mysql中的行。
我还配置了一个elasticsearch接收器连接器。连接器创建成功,elasticsearch中的索引也在那里,但我在elasticsearch索引中没有看到任何文档。
这是es接收器连接器配置:
{
"name": "es-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "http://es:9200",
"type.name": "_doc",
"topics": "test_topic",
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
}
}
这是我在查询接收器连接器的状态时看到的: curl -X GET http://connect:8083/connectors/es-connector
```
{
"name": "es-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "connect:8083"
}
],
"type": "sink"
}
在elasticsearch中,我可以看到索引 `http://es:9200/test_topic/_search` ```
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
我一直在mysql中进行更新和插入,我使用ksqlcli在流中看到消息,但在elasticsearch中没有创建任何文档。我甚至用手工创建了一个主题 kafka-avro-console-producer
和发布的消息,然后为这个主题创建了第二个接收器连接器,同样的结果,我看到了索引,但没有文档。
我看不到Kafka连接错误,所以我不明白为什么不工作。连接器配置有问题吗?我错过什么了吗?
编辑:
对于elasticsearch接收器配置,我尝试了使用和不使用以下行:
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
结果是一样的,没有文件。
编辑
我发现错误:
键用作文档id,不能为空
.
1条答案
按热度按时间vqlkdk9b1#
与
elasticsearch接收器将使用主题+分区+偏移量作为elasticsearch文档id。如您所发现的,您将为每条消息获得一个新文档。
与
elasticsearch接收器将使用kafka消息的密钥作为elasticsearch文档id。如果您的kafka消息中没有密钥,则可以理解会出现错误
Key is used as document id and cannot be null
. 您可以使用各种方法来设置Kafka消息中的密钥,包括单个消息转换来设置Kafka消息密钥(如果您是通过Kafka连接接收的),详细信息如下。