通过restclusterclient发送作业时发生classnotfoundexception

ffx8fchx  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(239)

我有一个flink作业,当我手动将它上传到集群(使用ui)时运行良好。
但是,当我尝试通过restclusterclient部署它时,它没有给我一个classnotfoundexception(我确实看到作业出现在集群上,并且失败了)。

val packagedProgram = new PackagedProgram(
    new File("/home/laurent/Projects/lead-job-runner/target/lead-0.1-jar-with-dependencies.jar"),
    Array("--kafka-bootstrap-servers", "kafka:29092"): _*   
)
val configuration = new Configuration()  
configuration.setString(JobManagerOptions.ADDRESS, "localhost")  
configuration.setString("jobmanager.rpc.port", "6123")

val clusterClient = new RestClusterClient[StandaloneClusterId](
    configuration,
    StandaloneClusterId.getInstance()
)  
clusterClient.run(packagedProgram, 2)

以下是来自jobmanager的日志:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
    at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
    at  org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
    at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.<init>(TwoInputStreamTask.java:55)
    at sun.reflect.GeneratedConstructorAccessor11.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
    at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.ClassNotFoundException: eu.euranova.chng.PlanBuilder$$anon$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at java.util.ArrayList.readObject(ArrayList.java:797)
    at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395)
    ... 10 more

所讨论的类确实不是jar中的包(它是一个scala匿名函数类)。

但是。。。为什么与restclusterclient相比,使用完全相同的fat jar手动启动时作业运行良好?
另外,当将flink log level设置为debug时,我在intellij中看到很多这样的日志:

DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=17, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.ShareEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 17
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming SideOutputTransformation{id=21, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=20, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=19, name='Co-Map', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming PartitionTransformation{id=12, name='Partition', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), eu.euranova.model.InstallEvent(userId: String, gameId: String, timestamp: Long, name: String, dumVar: Long)>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 19
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 20
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming SideOutputTransformation{id=24, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=23, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 23

... 这似乎表明,至少“驱动程序”在本地运行,而任务被发送到集群(另一种解释是集群将所有日志发送回restclusterclient,我发现它没有链接)
你知道这是什么原因吗?我怎样才能让它工作?
事先谢谢你的帮助。
致以最诚挚的问候,
劳伦特。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题