我正在使用一个inputTopic,并从中提取一个String(主题名称)列表。
List<String> Topics = {"Topic1", "Topic2", "Topic3"}
字符串
我需要为每个主题创建KStream以及它们的状态存储,其中应该只包含每个键的最后一个值。
这些主题(即,topic1、topic2和topic3已经存在,其中包含数百万条消息)。
如何创建这些(多个)流,异步地消耗每个主题的所有消息,并将它们写入outputTopic。
我对Kafka环境很陌生。
我正在使用一个inputTopic,并从中提取一个String(主题名称)列表。
List<String> Topics = {"Topic1", "Topic2", "Topic3"}
字符串
我需要为每个主题创建KStream以及它们的状态存储,其中应该只包含每个键的最后一个值。
这些主题(即,topic1、topic2和topic3已经存在,其中包含数百万条消息)。
如何创建这些(多个)流,异步地消耗每个主题的所有消息,并将它们写入outputTopic。
我对Kafka环境很陌生。
1条答案
按热度按时间z8dt9xmd1#
一个KStream可以同时使用多个topic,但是KTable不会按照topic名称存储数据。您需要按照topic名称进行过滤,例如使用
branch
操作符或TopicNameExtractor,或者只创建3个唯一的KStream示例,然后使用toTable
创建唯一的状态存储KTables自动存储具有最新键的值