带kafka connect elasticsearch连接器的邮件订单

agxfikkp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(390)

我们在强制使用kafka connect elasticsearch连接器将来自kafka主题的消息发送到elasticsearch的顺序时遇到问题。在本主题中,消息的顺序正确,偏移量正确,但如果有两条消息连续快速创建具有相同id,则它们会以错误的顺序间歇性地发送到elasticsearch。这将导致elasticsearch具有来自第二条最后一条消息的数据,而不是来自最后一条消息的数据。如果我们在主题中的两条消息之间人为地增加一两秒的延迟,问题就会消失。
此处的文档说明:
使用分区级kafka偏移量作为文档版本,并使用 version_mode=external .
但是我找不到任何关于这个的文档 version_mode 设置,以及是否是我们需要设置自己的地方。
在kafka connect系统的日志文件中,我们可以看到两条消息(对于相同的id)以错误的顺序处理,相隔几毫秒。可能很重要的一点是,它们看起来是在不同的线程中处理的。还要注意,这个主题只有一个分区,所以所有消息都在同一个分区中。
下面是日志片段,为清晰起见稍加编辑。kafka主题中的消息由debezium填充,我认为这与问题无关,但碰巧包含了timestamp值。这表明消息的处理顺序是错误的(尽管它们在kafka主题中的顺序是正确的,由debezium填充):

[2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "
{
  "op": "u",
  "before": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM BEFORE SECOND UPDATE >> ...
  },
  "after": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM AFTER SECOND UPDATE >> ...
  },
  "source": { ... },
  "ts_ms": 1547716205205
}
" (org.apache.http.wire)

...

[2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "
{
  "op": "u",
  "before": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM BEFORE FIRST UPDATE >> ...
  },
  "after": {
    "id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
    ... << DATA FROM AFTER FIRST UPDATE >> ...
  },
  "source": { ... },
  "ts_ms": 1547716204190
}
" (org.apache.http.wire)

有人知道如何强制此连接器在向elasticsearch发送消息时维护给定文档id的消息顺序吗?

piztneat

piztneat1#

问题是我们的elasticsearch连接器 key.ignore 配置设置为 true .
我们在连接器的github源代码中发现了这行代码(在dataconverter.java中):

final Long version = ignoreKey ? null : record.kafkaOffset();

这意味着 key.ignore=true ,正在生成并发送到elasticsearch的索引操作实际上是“无版本的”。。。基本上,elasticsearch为文档接收的最后一组数据将替换任何以前的数据,即使它是“旧数据”。
通过查看日志文件,连接器似乎有几个使用者线程读取源主题,然后将转换后的消息传递给elasticsearch,但传递给elasticsearch的顺序不一定与主题顺序相同。
使用 key.ignore=false ,每个elasticsearch消息现在包含一个等于kafka记录偏移量的版本值,并且elasticsearch拒绝更新文档的索引数据,如果它已经接收到更高版本的数据。
这不是唯一能解决这个问题的。我们仍然需要对kafka主题中的debezium消息进行转换,以将密钥转换为elasticsearch满意的纯文本格式:

"transforms": "ExtractKey",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "id"

相关问题