samza任务未在一个分区上接收

kyvafyod  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(345)

我的一个samza任务有一个令人费解的问题。除了一个分区上的消息外,它工作正常。我有9个分区的主题。如果我发送1000条信息,我只收到大约890条。
我已经向kafka控制台使用者检查了分区键,我知道它不会被samza作业处理,控制台使用者确实看到了消息,所以我知道它正在写入主题,至少一个普通使用者可以很好地看到它。
我已经在samza上启用了调试日志记录,并且来自 org.apache.samza.checkpoint.kafka.KafkaCheckpointManager 也就是说:
为taskname分区4添加检查点checkpoint[offset={systemstreampartition[kafka,com.mycompany.indexing.document,4]=448}]
分区4总是写着448。分区0也有类似的日志,但是在显示448的地方,这个数字在稳步增长。
我很乐意分享任何有助于缩小范围的有趣的配置信息,但现在,我对我将要分享的东西有点困惑。
我是作为 ThreadJobFactory 使用:
samza-kafka 2.10版本0.9.1
Kafka2.10版本0.8.2.1在客户端
Kafka经纪人0.9.0.0
更新
我查看了一个使用相同分区键的上游samza作业,发现了分区4上游的问题。用kafkacat检查samza检查点主题,我看到分区4的检查点没有前进。首先我看到:

{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96639","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47135","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49476","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62263","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52151","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58081","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47712","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45831","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81713

一分钟后我看到:

{"SystemStreamPartition [kafka, resource.mutation, 6]":{"system":"kafka","partition":"6","offset":"96624","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 3]":{"system":"kafka","partition":"3","offset":"47115","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 0]":{"system":"kafka","partition":"0","offset":"49462","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 4]":{"system":"kafka","partition":"4","offset":"2556","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 8]":{"system":"kafka","partition":"8","offset":"62252","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 1]":{"system":"kafka","partition":"1","offset":"52134","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 7]":{"system":"kafka","partition":"7","offset":"58063","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 5]":{"system":"kafka","partition":"5","offset":"47696","stream":"resource.mutation"}}
{"SystemStreamPartition [kafka, resource.mutation, 2]":{"system":"kafka","partition":"2","offset":"45817","stream":"resource.mutation"}}
% Reached end of topic __samza_checkpoint_ver_1_for_resource-normalizer_1 [0] at offset 81722

数字不会超过2556。但是,看看 resource.mutation 在分区4上,最后一个偏移量的范围与其他偏移量相似,到目前为止大约为61000个偏移量,并且还在增长。
根本没有错误消息或警告消息。它只是停止消耗分区4。

wn9m85ua

wn9m85ua1#

问题是有一条消息超出了默认值 max.message.bytes 对于Kafka消费者来说。但是,负责使用该分区的线程不会给出任何类型的错误消息,而是挂起该消息。其他分区线程将继续愉快地运行。
一旦我们配置好 systems.kafka.consumer.fetch.message.max.bytes 如果值足够大,可以使用分区上的每条消息并重新启动作业,那么它就会恢复原来的状态,一切都按预期开始工作。

相关问题