我需要将相同的<key,value>消息提供给两个不同的分支,但从单个流读取。两个分支都应该获得所有消息(即,我对两个分支的 predicate 是相同的)。
我尝试了下面的代码,但我看到只有第一个分支得到消息,而第二个分支没有得到消息。
KStream<String, byte[]>[] branches = builder.<String, byte[]>stream("source-topic)
.branch((key, val) -> true, // this goes to output topic1
(key, val) -> true); // this goes to output topic2
// branch[0] map on logic1Ondata and send to topic1
branches[0].map(logic1OnData).filter(
(key, value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic1", Produced.with(Serdes.String(), Serdes.String())
// branch[1] map on logic2Ondata and send to topic2
branches[1].map(logic2OnData).filter(
(key, value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic2", Produced.with(Serdes.String(), Serdes.String())
期望两个分支从“源主题”得到相同的消息。每个分支分别对logic1ondata和logic2ondata进行验证,并写入topic1和topic2,但我只看到分支[0]接收消息,没有看到分支[1]接收消息
如果我不能选择分支,我想知道有没有其他的方法。
编辑:分支有点像if else。因此,如果第一个 predicate 匹配,它将不考虑rest。因此,只有topic1获得了上述逻辑的消息。
相反,用下面的方法,我可以得到消息是给多个路径。
KStream<String, byte[]>[] stream = builder.<String, byte[]>stream("source-topic);
stream.map(logic1OnData).filter(
(key, value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic1", Produced.with(Serdes.String(), Serdes.String())
stream.map(logic2OnData).filter(
(key, value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic2", Produced.with(Serdes.String(), Serdes.String())
这一变化为两个Map提供了所有信息。进一步筛选并写入各自的主题。
暂无答案!
目前还没有任何答案,快来回答吧!