给定:我在Kafka有两个主题,比方说主题a和主题b。kafka流从主题a读取一条记录,对其进行处理,并生成多条与消耗的记录相对应的记录(比如recorda和recordb)。现在,问题是如何使用Kafka流实现这一点。
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。
2条答案
按热度按时间1aaf6o9v1#
我不确定我是否正确理解了这个问题,我也不明白@abhishek的答案:(
如果您有一个输入流,并且希望每个输入记录获得零个、一个或多个输出记录,那么可以应用
flatMap()
或者flatMapValues()
(取决于是否要修改密钥)。您还询问“如何将此列表划分为两个生产者流?”如果您要将一个流划分为多个流,则可以使用
branch()
.更多详情,请参阅以下文件:http://docs.confluent.io/current/streams/developer-guide.html#stateless-变换
6uxekuva2#
你的钥匙是什么类型的?我猜不是
String
. 在执行mapValues
你会得到这个-KStream<K,List<Message>>
. 如果K
不是String
那么someFunction()
可以是一个map
将转换为K
进入String
(如果是,您已经有了结果)并离开List<Message>
(价值)未被触及,因为这是你预期的最终结果