我们增加了分区的数量来并行处理消息,因为消息的吞吐量很高。一旦我们增加了分区的数量,所有订阅到该主题的流线程都会死掉。我们更改了用户组id,然后重新启动了应用程序,它工作正常。
我知道应用程序的分区数changelog topic应该和源topic相同。我想知道这背后的原因。
我看到了这个链接-https://issues.apache.org/jira/browse/kafka-6063?jql=project%20%3d%20kafka%20and%20component%20%3d%20streams%20and%20text%20~%20%22分区%22
找不到原因
https://github.com/apache/kafka/blob/fdc742b1ade420682911b3e336ae04827639cc04/streams/src/main/java/org/apache/kafka/streams/processor/internals/internaltopicmanager.java#l122
基本上,这个if条件背后的原因。
2条答案
按热度按时间ycl3bljg1#
如果有状态客户端因此失败,则删除changelog主题和本地状态存储。
rlcwz9us2#
输入主题分区定义并行级别,如果您有聚合或联接等有状态操作,则这些操作的状态将在sharded中显示。如果你有x个输入主题分区,你会得到x个任务,每个任务有一个状态碎片。此外,状态由kafka中的changelog主题支持,该主题有x个分区,每个shard正好使用其中一个分区。
如果将输入主题分区的数目更改为x+1,则kafka streams将尝试使用x存储碎片创建x+1任务,但是退出的changelog主题只有x个分区。因此,应用程序的整个分区都会中断,kafka流无法保证正确的处理,因此会因错误而关闭。
另请注意,kafka流假定输入数据按键进行分区。如果更改输入主题分区的数量,基于哈希的分区也会更改可能导致不正确输出的内容。
通常,建议在开始时对主题进行过度分区,以避免此问题。如果确实需要扩展,最好用新的分区数创建一个新的主题,并并行地启动应用程序的副本(使用新的应用程序id)。之后,更新上游生产者应用程序以写入新主题,最后关闭旧应用程序。