处理器api-转发到不同的主题

vi4fp9gy  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(389)

我有一个处理器api处理器,它在内部转发给几个独立的接收器(想想事件分类器,尽管它在事件之间也有状态逻辑)。我想稍后再把这两个主题联系起来。一旦加入,我会将元素的更新(丰富)版本转发给我实际加入的主题。
如果在处理器api代码中转发到多个接收器(sink1、sink2),这些接收器又被发送到主题中,您将如何混合dsl?
我想你可以创建单独的流,比如

val stream1 = builder.stream(outputTopic) 
val stream2 = builder.stream(outputTopic2)

从那里开始建造?然而,这会产生更多的次主题-这意味着什么?
另一种可能是在处理器api中拥有自己的状态存储,并在同一个处理器中管理它(实际上我正在这么做)。它增加了代码的复杂性,但不是更有效吗?例如,您可以删除不再使用的数据(一旦建立了联接,就可以将新联接的数据转发到接收器,并且这些数据不再符合联接的条件)。还有其他效率吗?

llew8vvj

llew8vvj1#

最简单的方法可能是从一个 StreamsBuilder 使用 transform() ```
StreamsBuilder builder = new StreamsBuilder()
KStream[] streams = builder.stream("input-topic")
.transform(/* put your processor API code here */)
.branch(...);

KStream joined = streams[0].join(streams[1], ...);

也可以先将中间流写入主题并读回。您得到更多子拓扑的事实应该是无关紧要的。
通过状态手动执行连接是可能的,但很难正确编码。如果可能,我建议使用dsl提供的join操作符。

相关问题