我们在强制使用kafka connect elasticsearch连接器将来自kafka主题的消息发送到elasticsearch的顺序时遇到问题。在本主题中,消息的顺序正确,偏移量正确,但如果有两条消息连续快速创建具有相同id,则它们会以错误的顺序间歇性地发送到elasticsearch。这将导致elasticsearch具有来自第二条最后一条消息的数据,而不是来自最后一条消息的数据。如果我们在主题中的两条消息之间人为地增加一两秒的延迟,问题就会消失。
此处的文档说明:
使用分区级kafka偏移量作为文档版本,并使用 version_mode=external
.
但是我找不到任何关于这个的文档 version_mode
设置,以及是否是我们需要设置自己的地方。
在kafka connect系统的日志文件中,我们可以看到两条消息(对于相同的id)以错误的顺序处理,相隔几毫秒。可能很重要的一点是,它们看起来是在不同的线程中处理的。还要注意,这个主题只有一个分区,所以所有消息都在同一个分区中。
下面是日志片段,为清晰起见稍加编辑。kafka主题中的消息由debezium填充,我认为这与问题无关,但碰巧包含了timestamp值。这表明消息的处理顺序是错误的(尽管它们在kafka主题中的顺序是正确的,由debezium填充):
[2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "
{
"op": "u",
"before": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM BEFORE SECOND UPDATE >> ...
},
"after": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM AFTER SECOND UPDATE >> ...
},
"source": { ... },
"ts_ms": 1547716205205
}
" (org.apache.http.wire)
...
[2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "
{
"op": "u",
"before": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM BEFORE FIRST UPDATE >> ...
},
"after": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM AFTER FIRST UPDATE >> ...
},
"source": { ... },
"ts_ms": 1547716204190
}
" (org.apache.http.wire)
有人知道如何强制此连接器在向elasticsearch发送消息时维护给定文档id的消息顺序吗?
1条答案
按热度按时间piztneat1#
问题是我们的elasticsearch连接器
key.ignore
配置设置为true
.我们在连接器的github源代码中发现了这行代码(在dataconverter.java中):
这意味着
key.ignore=true
,正在生成并发送到elasticsearch的索引操作实际上是“无版本的”。。。基本上,elasticsearch为文档接收的最后一组数据将替换任何以前的数据,即使它是“旧数据”。通过查看日志文件,连接器似乎有几个使用者线程读取源主题,然后将转换后的消息传递给elasticsearch,但传递给elasticsearch的顺序不一定与主题顺序相同。
使用
key.ignore=false
,每个elasticsearch消息现在包含一个等于kafka记录偏移量的版本值,并且elasticsearch拒绝更新文档的索引数据,如果它已经接收到更高版本的数据。这不是唯一能解决这个问题的。我们仍然需要对kafka主题中的debezium消息进行转换,以将密钥转换为elasticsearch满意的纯文本格式: