我有一个Flink0.10快照的代码,我得到了一个运行时错误,我不太明白。
代码如下:
DataStream<Thing> kafkaThings = env.addSource(
new FlinkKafkaConsumer082<Thing>(
"thing_data", // Topic to read from
new ThingSchema(reader), // Deserializer (provided as util)
props)
);
kafkaThings
.flatMap(new ThingFlatMapper())
.addSink(new ThingSink());
下面是运行时错误 flatMap()
步骤(我想……):
06:16:20.145 [flink-akka.actor.default-dispatcher-4] ERROR akka.actor.ActorSystemImpl flink-akka.actor.default-dispatcher-3 - Uncaught fatal error from thread [flink-akka.actor.default-dispatcher-3] shutting down ActorSystem [flink]
java.lang.NoClassDefFoundError: scala/runtime/AbstractPartialFunction$mcVL$sp
at java.lang.ClassLoader.defineClass1(Native Method) ~[na:1.8.0_45]
at java.lang.ClassLoader.defineClass(ClassLoader.java:760) ~[na:1.8.0_45]
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[na:1.8.0_45]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[na:1.8.0_45]
at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[na:1.8.0_45]
at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[na:1.8.0_45]
at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[na:1.8.0_45]
at java.security.AccessController.doPrivileged(Native Method) ~[na:1.8.0_45]
at java.net.URLClassLoader.findClass(URLClassLoader.java:361) ~[na:1.8.0_45]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_45]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_45]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_45]
at org.apache.flink.runtime.LeaderSessionMessageFilter$class.receive(LeaderSessionMessageFilter.scala:33) ~[flink-runtime-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$LogMessages$$super$receive(JobManager.scala:100) ~[flink-runtime-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.apache.flink.runtime.LogMessages$class.receive(LogMessages.scala:26) ~[flink-runtime-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.apache.flink.runtime.jobmanager.JobManager.receive(JobManager.scala:100) ~[flink-runtime-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at akka.actor.ActorCell.newActor(ActorCell.scala:567) ~[akka-actor_2.11-2.4.0.jar:na]
at akka.actor.ActorCell.create(ActorCell.scala:587) ~[akka-actor_2.11-2.4.0.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:460) ~[akka-actor_2.11-2.4.0.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482) ~[akka-actor_2.11-2.4.0.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) [akka-actor_2.11-2.4.0.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:223) [akka-actor_2.11-2.4.0.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [akka-actor_2.11-2.4.0.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.7.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.7.jar:na]
Caused by: java.lang.ClassNotFoundException: scala.runtime.AbstractPartialFunction$mcVL$sp
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_45]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_45]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) ~[na:1.8.0_45]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_45]
... 27 common frames omitted
我想它在那里是因为我在两个方向上都设置了断点 addSource
而在 ThingFlatMapper.flatMap
方法。它到了第一个,但不是第二个。
我认为这是一个简单的配置错误,但是错误消息并没有什么帮助(至少对我来说是这样)。
思想?
(作为一个事后的想法,在这里列出我的构建依赖可能是有意义的——这里是我的 build.gradle
:
compile "org.apache.flink:flink-clients:${project['flink.version']}"
compile "org.apache.flink:flink-connector-kafka:${project['flink.version']}"
compile "org.apache.flink:flink-core:${project['flink.version']}"
compile "org.apache.flink:flink-java:${project['flink.version']}"
compile "org.apache.flink:flink-optimizer:${project['flink.version']}"
compile "org.apache.flink:flink-streaming-java:${project['flink.version']}"
compile "org.apache.flink:flink-runtime:${project['flink.version']}"
compile "org.apache.flink:flink-java8:${project['flink.version']}"
flink.version设置为 1.0-SNAPSHOT
,我是从内部策划的。
暂无答案!
目前还没有任何答案,快来回答吧!