streamexecutionenvironment不能与apache flink中的表元组序列化

bq3bfh9z  于 2021-06-26  发布在  Flink
关注(0)|答案(0)|浏览(247)

我想知道是否有可能使一个类型的数据流

DataStream<Tuple4<String, Charge, List<Charge>, Table>>

对于元组中的表类型,表来自flink的table api,我尝试传递变量accumulatortable,它是process函数中的flink表,以返回元组,如下所示:

DataStream<Tuple4<String, Charge, List<Charge>, Table>> joinStream =
            currentStreamByKeys
                    .connect(historicStreamByKeys)
                    .flatMap(new LeftJoin())
                    .process(new ProcessFunction<Tuple2<Charge, List<Charge>>, Tuple4<String, Charge, List<Charge>, Table>>() {
                        @Override
                        public void processElement(Tuple2<Charge, List<Charge>> value, Context ctx, Collector<Tuple4<String, Charge, List<Charge>, Table>> out) throws Exception {
                            out.collect(
                                    new Tuple4<>(
                                            KeysExtractor.getKey(keys,value.f0),
                                            value.f0,
                                            value.f1,
                                            accumulatorTable
                                    )
                            );
                        }
                    })
                    .keyBy(0);

但我得到了一个错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the StreamExecutionEnvironment is not serializable. The object probably contains or references non serializable fields.

有没有可能实现这一点使用flink或使用typehints?提前感谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题