当我在Kafka中运行changefeed时,它会发出一段时间的消息,但随后就会卡住。在作业状态或日志中,我看到错误kafka server: Message was too large, server rejected it to avoid allocation error.
这是什么意思,我该如何修复它?
当我在Kafka中运行changefeed时,它会发出一段时间的消息,但随后就会卡住。在作业状态或日志中,我看到错误kafka server: Message was too large, server rejected it to avoid allocation error.
这是什么意思,我该如何修复它?
1条答案
按热度按时间xqnpmsa81#
Changefeeds每行发出一条消息,每条消息的大小与更改的数据库行的大小成正比。如果你的最大行长度足够大,一个批处理,甚至一个单独的消息,可能会比你的Kafka服务器配置支持的更大。这通常发生在jsonb列中。这最终会阻止changefeed在包含这些大行的范围上取得进展,甚至可能导致重复重试,从而导致与较大消息一起批量处理的较小消息下游出现大量重复。
如果可能的话,最简单的解决方案是增加Kafka的最大消息大小。这个答案将告诉你如何调整代理配置。
如果你无法调整Kafka的代理设置,你可以使用
kafka_sink_config
配置一些客户端设置,如这里所述。默认情况下,Kafka changefeeds被配置为最小化消息大小,但如果消息传入的速度比发送的速度快,则会对消息进行批处理。因此,完全禁用批处理可以防止此问题。在CockroachDB的最新版本中,在SQL中执行
SET CLUSTER SETTING changefeed.batch_reduction_retry_enabled = true
将允许实验行为来减少批量大小以响应错误,只要没有单个行太大,这就可以解决问题。现有的changefeeds将需要暂停和恢复以拾取设置。CockroachDB的未来版本将默认启用此行为。最后一个办法是删除或压缩表中有问题的行,然后启动changefeed的副本。请注意,简单地恢复现有的提要,或者使用游标从行被固化之前开始一个提要,都是不起作用的,因为提要仍然会尝试发出行的旧版本。