Kafka流:一个记录到多个记录

3b6akqbq  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(379)

给定:我在Kafka有两个主题,比方说主题a和主题b。kafka流从主题a读取一条记录,对其进行处理,并生成多条与消耗的记录相对应的记录(比如recorda和recordb)。现在,问题是如何使用Kafka流实现这一点。

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
        @Override
        public List<Message> apply(final Message message) {
          return consumerRecordHandler.process(message);
        }
    }).*someFunction*()

这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。

1aaf6o9v

1aaf6o9v1#

我不确定我是否正确理解了这个问题,我也不明白@abhishek的答案:(
如果您有一个输入流,并且希望每个输入记录获得零个、一个或多个输出记录,那么可以应用 flatMap() 或者 flatMapValues() (取决于是否要修改密钥)。
您还询问“如何将此列表划分为两个生产者流?”如果您要将一个流划分为多个流,则可以使用 branch() .
更多详情,请参阅以下文件:http://docs.confluent.io/current/streams/developer-guide.html#stateless-变换

6uxekuva

6uxekuva2#

你的钥匙是什么类型的?我猜不是 String . 在执行 mapValues 你会得到这个- KStream<K,List<Message>> . 如果 K 不是 String 那么 someFunction() 可以是一个 map 将转换为 K 进入 String (如果是,您已经有了结果)并离开 List<Message> (价值)未被触及,因为这是你预期的最终结果

相关问题