我有一个从RichSinkFunction扩展的接收器,它正在缓存一些信息。当我的进程完成时,我希望更新所有缓存的信息,以便强制调用它。
我可以从一个KeyedProcessAccumulatorFunction调用该接收器,用类型为ReadOnlyContext的上下文调用它,它就可以工作了。
public class PageAccumulateFunction implements KeyedProcessAccumulatorFunction{
public SessionAccumulator accumulate(
@NonNull Tuple2<CollectionMessage, PropertyInfo> value,
@NonNull SessionAccumulator accumulator,
@NonNull KeyedBroadcastProcessFunction.ReadOnlyContext ctx) {
....
ctx.output(outputTag, message);
}
}
但是在我的RichMapFunction类中,我不能调用那个接收器。我可以得到一个RuntimeContext对象(但不是ReadOnlyContext),但是我不知道我是否可以用它来调用RichSinkFunction接收器。
public class SessionMapper extends RichMapFunction<SessionAccumulator, GenericRecord>{
public GenericRecord map(SessionAccumulator sessionAccumulator) {
....
RuntimeContext ctx = getRuntimeContext();
....
}
}
你知道吗?
1条答案
按热度按时间oknwwptz1#
只有过程函数可以使用端输出(通过
ctx.output
写入)。MapFunction
自动向下游发送其map
方法的返回值(朝向接收器)。它以这种方式工作是因为Map是从输入到输出的一对一Map。大多数其他函数类型(例如,进程函数、平面Map)被传递给收集器,您可以使用收集器向下游发送事件。