java—mapfunction的实现不是可序列化的flink

xyhw6mcr  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(928)

我正在尝试实现一个类,该类允许用户在不限制输入流类型的情况下操作n个输入流。
对于初学者,我想将所有输入数据流转换为keyedstreams。因此,我将输入数据流Map到一个元组中,然后应用keyby将其转换为keystream。
我总是遇到序列化的问题,我试着遵循这个指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html 但没有成功。
我想知道的是:
什么是java中的序列化/反序列化?以及它的用途。
在flink中,我可以用序列化来解决哪些问题
我的代码有什么问题(您可以在代码和错误消息下面找到)
非常感谢你。
主要类别:

public class CEP {

private  Integer streamsIdComp = 0;
final  private Map<Integer, DataStream<?> > dataStreams = new HashMap<>();
final  private Map<Integer, TypeInformation<?>> dataStreamsTypes = new HashMap<>();

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    return Tuple2.of(streamsIdComp, value);
                }
            }).
            keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
                @Override
                public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
                    return integerTTuple2.f0;
                }
            });
    return keyedInputStream;
}

public <T1> void addInputStream(DataStream<T1> inputStream) {

    TypeInformation<T1> streamType = inputStream.getType();

    dataStreamsTypes.put(streamsIdComp, streamType);
    dataStreams.put(streamsIdComp, this.converttoKeyedStream(inputStream));
    streamsIdComp++;
}
}

测试等级

public class CEPTest {

@Test
public void addInputStreamTest() throws Exception {
    //test if we can change keys in a keyedStream
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Record> input1 = env.fromElements(
            new Record("1", 1, "a"),
            new Record("2", 2, "b"),
            new Record("3", 3, "c"))
            .keyBy(Record::getBizName);

    DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4);

    CEP cepObject = new CEP();
    cepObject.addInputStream(input1);
    cepObject.addInputStream(input2);

   }
}

错误消息

org.apache.flink.api.common.InvalidProgramException: The implementation of the MapFunction 
is not serializable. The implementation accesses fields of its enclosing class, which is a 
common reason for non-serializability. A common solution is to make the function a proper 
(non-inner) class, or a static inner class.

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)
at CEP.converttoKeyedStream(CEP.java:25)
at CEP.addInputStream(CEP.java:45)
at CEPTest.addInputStreamTest(CEPTest.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.io.NotSerializableException: CEP
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 29 more
5lwkijsr

5lwkijsr1#

flink是一个分布式框架。这意味着,您的程序可能要在数千个节点上运行。这也意味着每个工作节点必须接收要与所需上下文一起执行的代码。简化一点,流经系统的事件和要执行的函数都必须是可序列化的,因为它们是通过有线传输的。这就是为什么序列化在分布式编程中非常重要的原因。
简而言之,序列化是将数据编码为字节表示的过程,可以在另一个节点(另一个jvm)上传输和恢复。
回到问题上来。你的理由是:

Caused by: java.io.NotSerializableException: CEP

这是由线路引起的

return Tuple2.of(streamsIdComp, value);

您正在使用 streamsIdComp 变量中的字段 CEP 班级。这意味着,flink必须序列化整个类才能在执行时访问这个字段 MapFunction . 你可以通过引入局部变量来克服它 converttoKeyedStream 功能:

public <T> KeyedStream<Tuple2<Integer, T>, Integer> converttoKeyedStream(DataStream<T> inputStream){

    Preconditions.checkNotNull(inputStream, "dataStream");
    TypeInformation<T> streamType = inputStream.getType();
    // note this variable is local
    int localStreamsIdComp = streamsIdComp;

    KeyedStream<Tuple2<Integer,T>,Integer> keyedInputStream = inputStream.
            map(new MapFunction<T, Tuple2<Integer,T>>() {
                @Override
                public Tuple2<Integer, T> map(T value) throws Exception {
                    // and is used here
                    return Tuple2.of(localStreamsIdComp, value);
                }
            }).
            keyBy(new KeySelector<Tuple2<Integer, T>, Integer>() {
                @Override
                public Integer getKey(Tuple2<Integer, T> integerTTuple2) throws Exception {
                    return integerTTuple2.f0;
                }
            });
    return keyedInputStream;
}

这样一来,flink只能序列化这个变量,而不是整个类本身。

相关问题