我有一个接收记录的kafka流,我想根据特定字段连接消息。
流中的消息如下所示:
Key: 2099
Payload{
email: tom@emample.com
eventCode: 2099
}
预期产量:
key: 2099
Payload{
emails: tom@example, bill@acme.com, jane@example.com
}
我可以让这条小溪流淌得很好,我只是不知道兰达河应该包含什么。
这就是我到目前为止所做的。我不确定是否应该使用map、aggregate或reduce或这些操作的组合。
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Payload> inputStream = builder.stream(INPUT_TOPIC);
inputStream
.groupByKey()
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(300000)))
// Not sure what to do here …..
}).to (OUTPUT_TOPIC );
1条答案
按热度按时间xtfmy6hx1#
可能是这样的