在Flink中从RichMap函数调用RichSinkFunction

gtlvzcf8  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(303)

我有一个从RichSinkFunction扩展的接收器,它正在缓存一些信息。当我的进程完成时,我希望更新所有缓存的信息,以便强制调用它。
我可以从一个KeyedProcessAccumulatorFunction调用该接收器,用类型为ReadOnlyContext的上下文调用它,它就可以工作了。

public class PageAccumulateFunction implements KeyedProcessAccumulatorFunction{
    public SessionAccumulator accumulate(
            @NonNull Tuple2<CollectionMessage, PropertyInfo> value,
            @NonNull SessionAccumulator accumulator,
            @NonNull KeyedBroadcastProcessFunction.ReadOnlyContext ctx) {
        ....
        ctx.output(outputTag, message);
    }
}

但是在我的RichMapFunction类中,我不能调用那个接收器。我可以得到一个RuntimeContext对象(但不是ReadOnlyContext),但是我不知道我是否可以用它来调用RichSinkFunction接收器。

public class SessionMapper extends RichMapFunction<SessionAccumulator, GenericRecord>{
    public GenericRecord map(SessionAccumulator sessionAccumulator) {
        ....
        RuntimeContext ctx = getRuntimeContext();
        ....
    }
}

你知道吗?

oknwwptz

oknwwptz1#

只有过程函数可以使用端输出(通过ctx.output写入)。
MapFunction自动向下游发送其map方法的返回值(朝向接收器)。它以这种方式工作是因为Map是从输入到输出的一对一Map。大多数其他函数类型(例如,进程函数、平面Map)被传递给收集器,您可以使用收集器向下游发送事件。

相关问题