我们有以下流处理要求。
Source Stream ->
transform(condition check - If (true) then generate MULTIPLE ADDITIONAL messages else just transform the incoming message) ->
output kafka topic
Example:
If condition is true for message B(D,E,F are the additional messages produced)
A,B,C -> A,D,E,F,C -> Sink Kafka Topic
If condition is false
A,B,C -> A,B,C -> Sink Kafka Topic
有没有一种方法可以在Kafka流中实现这一点?
1条答案
按热度按时间q9yhzks01#
你可以用
flatMap()
或者flatMapValues()
方法。这些方法获取一条记录并生成零条、一条或多条记录。flatMap()
可以修改键、值及其数据类型,同时flatMapValues()
保留原始键并更改值和值数据类型。下面是一个伪代码示例,考虑到新消息“c”、“d”、“e”将有一个新密钥。
您可以阅读更多信息:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-开发者指南dsl转换无状态