kafka streams dsl过程方法是如何工作的?

pcww981p  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(339)

我一直在与Kafka流工作了一点,得到一些基本的功能工作,但我有一些麻烦,我的头围绕着 process Kafka流dsl中的方法。明确地:
我知道有两种使用kafka流的方法,低层处理api和高层流dsl。在较低的层次上,您可以更明确地定义您的拓扑,命名每个节点等,而streams dsl将其中的大部分抽象掉。
然而,更高级别的流dsl有一个名为 process() 这是一个终端操作(即方法返回 void ). 所以我的问题是,处理过的数据-数据 Processorvoid forward(key, value) 方法发送-开始?
在较低级别的处理api中,您可以命名处理器节点并可以链接 Sink 但是在dsl流中没有名字,或者至少没有我能找到的名字。

tp5buhyn

tp5buhyn1#

有可能但“笨拙”地 forward() 数据来自 Processor 在dsl中。正如你所说的 process() 方法被定义为终端操作,因此,您不应该调用 forward() . 如果你打电话 forward() 无论如何,没有分配下游处理器, forward() 基本上是禁止的。
但是,与其尝试为其添加下游处理器 process() (什么是黑客),你应该用 transform() (b)和相关方法, transformValues() , flatTransform() ,和 flatTransformValues() )相反,您希望将数据发送到下游。

相关问题