在kafka streams应用程序中,示例仅获取已分配给该示例的分区的输入主题的消息。作为 group.id
,它基于(对于所有相同的示例) application.id
,这意味着每个示例只能看到主题的一部分。
当然,这一切都很有意义,我们在高吞吐量数据主题中利用了这一点,但是我们还希望通过向输入主题添加主题范围的“控制消息”来控制streams应用程序。但由于所有示例都需要获取这些消息,因此我们要么
每个分区一条控制消息(使发送者有必要知道分区方案,这是我们希望避免的)
每个键一个控制消息(因此每个活动分区将至少获得一个控制消息)
因为这对发送者来说很麻烦,所以除了数据主题之外,我们正在考虑为streams应用程序使用的控制消息创建一个新主题。但是,我们如何使每个分区都能接收来自控制消息主题的所有消息呢?
根据https://stackoverflow.com/a/55236780/709537,无法为kafka流设置组id。
一种方法是创建和使用 KafkaConsumer
除了使用kafka流之外,它还允许我们根据需要设置组id。然而,这听起来复杂而肮脏,足以让人怀疑,是否有一个更直接的方式,我们错过了。
有什么想法吗?
1条答案
按热度按时间chhqkbe11#
您可以使用从所有分区获取数据的全局存储。
从文件来看,
将全局状态存储添加到拓扑。statestore从提供的输入主题的所有分区中获取数据。每个kafka streams示例将正好有一个statestore示例。
语法如下:
最后一个论点是
ProcessorSupplier
它有一个get()
返回一个Processor
将对每个新消息执行。这个Processor
包含process()
方法,该方法将在每次向主题发送新消息时执行。全局存储是针对每个流示例的,因此您可以获得每个流示例中的所有主题数据。
在这个过程中(k键,v值),您可以编写您的处理逻辑。
全局存储可以是内存中的,也可以是持久的,并且可以由changelog主题支持,这样即使streams示例本地数据(state)被删除,也可以使用changelog主题构建存储。