kafka connect elasticsearch没有索引文档

eit6fx6z  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(351)

我正在尝试设置一个测试,将数据从mysql移动到elasticsearch。
我有一个停靠设置,包括broker、zookeeper、connect、ksql服务器和cli、schema registry和elasticsearch。我使用的是来自ConfluentVersion5.1.0的docker图像,对于elasticsearch,我使用的是elasticsearch:6.5.4
我配置了一个jdbc连接器从mysql获取数据到kafka,这是工作的我看到我的主题创建和使用ksqlcli我可以看到流中的新消息,因为我更新mysql中的行。
我还配置了一个elasticsearch接收器连接器。连接器创建成功,elasticsearch中的索引也在那里,但我在elasticsearch索引中没有看到任何文档。
这是es接收器连接器配置:

{
    "name": "es-connector",
    "config": {
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "connection.url": "http://es:9200",
            "type.name": "_doc",
            "topics": "test_topic",
            "drop.invalid.message": true,
            "behavior.on.null.values": "ignore",
            "behavior.on.malformed.documents": "ignore",
            "schema.ignore": true
    }
}

这是我在查询接收器连接器的状态时看到的: curl -X GET http://connect:8083/connectors/es-connector ```
{
"name": "es-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "connect:8083"
}
],
"type": "sink"
}

在elasticsearch中,我可以看到索引 `http://es:9200/test_topic/_search` ```
{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}

我一直在mysql中进行更新和插入,我使用ksqlcli在流中看到消息,但在elasticsearch中没有创建任何文档。我甚至用手工创建了一个主题 kafka-avro-console-producer 和发布的消息,然后为这个主题创建了第二个接收器连接器,同样的结果,我看到了索引,但没有文档。
我看不到Kafka连接错误,所以我不明白为什么不工作。连接器配置有问题吗?我错过什么了吗?
编辑:
对于elasticsearch接收器配置,我尝试了使用和不使用以下行:

"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true

结果是一样的,没有文件。
编辑
我发现错误:
键用作文档id,不能为空
.

vqlkdk9b

vqlkdk9b1#

"key.ignore": true

elasticsearch接收器将使用主题+分区+偏移量作为elasticsearch文档id。如您所发现的,您将为每条消息获得一个新文档。

"key.ignore": false

elasticsearch接收器将使用kafka消息的密钥作为elasticsearch文档id。如果您的kafka消息中没有密钥,则可以理解会出现错误 Key is used as document id and cannot be null . 您可以使用各种方法来设置Kafka消息中的密钥,包括单个消息转换来设置Kafka消息密钥(如果您是通过Kafka连接接收的),详细信息如下。

相关问题