我想知道是否有可能使一个类型的数据流
DataStream<Tuple4<String, Charge, List<Charge>, Table>>
对于元组中的表类型,表来自flink的table api,我尝试传递变量accumulatortable,它是process函数中的flink表,以返回元组,如下所示:
DataStream<Tuple4<String, Charge, List<Charge>, Table>> joinStream =
currentStreamByKeys
.connect(historicStreamByKeys)
.flatMap(new LeftJoin())
.process(new ProcessFunction<Tuple2<Charge, List<Charge>>, Tuple4<String, Charge, List<Charge>, Table>>() {
@Override
public void processElement(Tuple2<Charge, List<Charge>> value, Context ctx, Collector<Tuple4<String, Charge, List<Charge>, Table>> out) throws Exception {
out.collect(
new Tuple4<>(
KeysExtractor.getKey(keys,value.f0),
value.f0,
value.f1,
accumulatorTable
)
);
}
})
.keyBy(0);
但我得到了一个错误:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the StreamExecutionEnvironment is not serializable. The object probably contains or references non serializable fields.
有没有可能实现这一点使用flink或使用typehints?提前感谢!
暂无答案!
目前还没有任何答案,快来回答吧!