我有一个flink作业,当我手动将它上传到集群(使用ui)时运行良好。
但是,当我尝试通过restclusterclient部署它时,它没有给我一个classnotfoundexception(我确实看到作业出现在集群上,并且失败了)。
val packagedProgram = new PackagedProgram(
new File("/home/laurent/Projects/lead-job-runner/target/lead-0.1-jar-with-dependencies.jar"),
Array("--kafka-bootstrap-servers", "kafka:29092"): _*
)
val configuration = new Configuration()
configuration.setString(JobManagerOptions.ADDRESS, "localhost")
configuration.setString("jobmanager.rpc.port", "6123")
val clusterClient = new RestClusterClient[StandaloneClusterId](
configuration,
StandaloneClusterId.getInstance()
)
clusterClient.run(packagedProgram, 2)
以下是来自jobmanager的日志:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1164)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.<init>(TwoInputStreamTask.java:55)
at sun.reflect.GeneratedConstructorAccessor11.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: eu.euranova.chng.PlanBuilder$$anon$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at java.util.ArrayList.readObject(ArrayList.java:797)
at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395)
... 10 more
所讨论的类确实不是jar中的包(它是一个scala匿名函数类)。
但是。。。为什么与restclusterclient相比,使用完全相同的fat jar手动启动时作业运行良好?
另外,当将flink log level设置为debug时,我在intellij中看到很多这样的日志:
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming TwoInputTransformation{id=17, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.ShareEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph - CO-TASK: 17
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming SideOutputTransformation{id=21, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming TwoInputTransformation{id=20, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming TwoInputTransformation{id=19, name='Co-Map', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming PartitionTransformation{id=12, name='Partition', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), eu.euranova.model.InstallEvent(userId: String, gameId: String, timestamp: Long, name: String, dumVar: Long)>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph - CO-TASK: 19
DEBUG org.apache.flink.streaming.api.graph.StreamGraph - CO-TASK: 20
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming SideOutputTransformation{id=24, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming TwoInputTransformation{id=23, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph - CO-TASK: 23
... 这似乎表明,至少“驱动程序”在本地运行,而任务被发送到集群(另一种解释是集群将所有日志发送回restclusterclient,我发现它没有链接)
你知道这是什么原因吗?我怎样才能让它工作?
事先谢谢你的帮助。
致以最诚挚的问候,
劳伦特。
暂无答案!
目前还没有任何答案,快来回答吧!