指向同一主题的所有示例本地存储

vc6uscn9  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(304)

我们有以下问题:
我们想要监听某个kafka主题并构建它的“历史记录”——因此对于指定的键,提取一些数据,将其添加到该键的现有列表中(如果该键不存在,则创建一个新的列表),然后将其放到另一个主题中,该主题只有一个分区并且高度压缩。另一个应用程序只需监听该主题并更新其历史记录列表。
我在想它和Kafka图书馆有什么关系。我们当然可以使用聚合:

msgReceived.map((key, word) -> new KeyValue<>(key, word))
           .groupBy((k,v) -> k, stringSerde, stringSerde)
           .aggregate(String::new,
                     (k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
                     stringSerde, "summaries2")
           .to(stringSerde, stringSerde, "transaction-summary50");

它创建一个由kafka支持的本地存储并将其用作历史表。
我担心的是,如果我们决定扩展这样的应用程序,每个正在运行的示例将创建一个新的主题 ${applicationId}-${storeName}-changelog (我假设每个应用程序都有不同的 applicationId ). 每个示例开始使用输入主题,获取不同的键集并构建不同的状态子集。如果kafka决定重新平衡,一些示例将开始错过本地存储中的一些历史状态,因为它们将获得一组全新的分区来使用。
问题是,如果我只是为每个正在运行的示例设置相同的applicationid,那么它最终是否应该重放来自每个正在运行的示例具有相同本地状态的kafka主题的所有数据?

qv7cva1a

qv7cva1a1#

我担心的是,如果我们决定扩展这样的应用程序,每个正在运行的示例将创建一个新的备份主题${applicationid}-${storename}-changelog(我假设每个应用程序都有不同的applicationid)。每个示例开始使用输入主题,获取不同的键集并构建不同的状态子集。如果kafka决定重新平衡,一些示例将开始错过本地存储中的一些历史状态,因为它们将获得一组全新的分区来使用。
有些假设不正确:
如果您运行应用程序的多个示例来扩展应用程序,则所有示例都必须具有相同的应用程序id(参见kafka的消费者组管理协议)——否则,将不会共享负载,因为每个示例都将被视为自己的应用程序,并且每个示例都将分配所有分区。
因此,如果所有示例都使用相同的应用程序id,则所有正在运行的应用程序示例都将使用相同的changelog主题名,因此,您要做的事情应该是现成的。

lpwwtiir

lpwwtiir2#

为什么要创建多个具有不同id的应用程序来执行相同的任务?Kafka实现并行性的方式是通过任务:
应用程序的处理器拓扑是通过将其分解为多个任务来扩展的。
更具体地说,kafka streams基于应用程序的输入流分区创建固定数量的任务,每个任务分配一个来自输入流的分区列表(即kafka主题)。分区对任务的分配从不改变,因此每个任务都是应用程序的固定并行单元。
然后,任务可以基于分配的分区示例化自己的处理器拓扑;它们还为每个分配的分区维护一个缓冲区,并从这些记录缓冲区一次处理一条消息。因此,流任务可以独立地并行处理,而无需人工干预。
如果需要扩展应用程序,可以启动运行相同应用程序(相同应用程序id)的新示例,一些已分配的任务将重新分配给新示例。本地状态存储的迁移将由库自动处理:
当重新分配发生时,一些分区以及相应的任务(包括任何本地状态存储)将从现有线程“迁移”到新添加的线程。因此,kafka流在kafka主题分区的粒度上有效地重新平衡了应用程序示例之间的工作负载。
我建议你看看这本指南。

相关问题