我有一个处理器api处理器,它在内部转发给几个独立的接收器(想想事件分类器,尽管它在事件之间也有状态逻辑)。我想稍后再把这两个主题联系起来。一旦加入,我会将元素的更新(丰富)版本转发给我实际加入的主题。
如果在处理器api代码中转发到多个接收器(sink1、sink2),这些接收器又被发送到主题中,您将如何混合dsl?
我想你可以创建单独的流,比如
val stream1 = builder.stream(outputTopic)
val stream2 = builder.stream(outputTopic2)
从那里开始建造?然而,这会产生更多的次主题-这意味着什么?
另一种可能是在处理器api中拥有自己的状态存储,并在同一个处理器中管理它(实际上我正在这么做)。它增加了代码的复杂性,但不是更有效吗?例如,您可以删除不再使用的数据(一旦建立了联接,就可以将新联接的数据转发到接收器,并且这些数据不再符合联接的条件)。还有其他效率吗?
1条答案
按热度按时间llew8vvj1#
最简单的方法可能是从一个
StreamsBuilder
使用transform()
```StreamsBuilder builder = new StreamsBuilder()
KStream[] streams = builder.stream("input-topic")
.transform(/* put your processor API code here */)
.branch(...);
KStream joined = streams[0].join(streams[1], ...);