我一直在与Kafka流工作了一点,得到一些基本的功能工作,但我有一些麻烦,我的头围绕着 process
Kafka流dsl中的方法。明确地:
我知道有两种使用kafka流的方法,低层处理api和高层流dsl。在较低的层次上,您可以更明确地定义您的拓扑,命名每个节点等,而streams dsl将其中的大部分抽象掉。
然而,更高级别的流dsl有一个名为 process()
这是一个终端操作(即方法返回 void
). 所以我的问题是,处理过的数据-数据 Processor
的 void forward(key, value)
方法发送-开始?
在较低级别的处理api中,您可以命名处理器节点并可以链接 Sink
但是在dsl流中没有名字,或者至少没有我能找到的名字。
1条答案
按热度按时间tp5buhyn1#
有可能但“笨拙”地
forward()
数据来自Processor
在dsl中。正如你所说的process()
方法被定义为终端操作,因此,您不应该调用forward()
. 如果你打电话forward()
无论如何,没有分配下游处理器,forward()
基本上是禁止的。但是,与其尝试为其添加下游处理器
process()
(什么是黑客),你应该用transform()
(b)和相关方法,transformValues()
,flatTransform()
,和flatTransformValues()
)相反,您希望将数据发送到下游。