scala—由于本地类不兼容,对同一作业上的两个不同数据集运行操作失败并导致corruptconfigurationexception

qncylg1j  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(364)

我正在scala中编写一个flink作业,包含两个数据集,如下所示:

val latsDS : DataSet[String] = {
  env.fromCollection(lats.values.toSeq)
}
val entsDS : DataSet[Seq[Entity]] = {
  env.fromCollection(ents.values.map(l => l.asScala))
}

之后,我会:

val ec = entsDS.map( x => x.length ).map((_,1)).groupBy(0).sum(1)
ec.writeAsCsv(path,"\n","-",WriteMode.OVERWRITE)
env.execute

正如预期的那样,效果非常好。
如果我这样做而不是上述:

val wc = latsDS.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map((_,1)))
.groupBy(0).sum(1)
wc.writeAsText(path2,WriteMode.OVERWRITE)
env.execute

这也非常有效。但是,如果我改写:

val wc = latDS.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map((_,1)))
.groupBy(0).sum(1)
wc.writeAsText(path2,WriteMode.OVERWRITE)
val ec = entitiesDS.map( x => x.length ).map((_,1)).groupBy(0).sum(1)
ec.writeAsCsv(path,"\n","-",WriteMode.OVERWRITE)
env.execute

我明白了 Caused by: java.io.InvalidClassException: com.testflink.mywordcount.WordCount$$anon$4; local class incompatible: stream classdesc serialVersionUID = -8230073325803923865, local class serialVersionUID = -4336418598811571750 更完整的堆栈跟踪如下。

Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:505)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: com.testflink.mywordcount.WordCount$$anon$4; local class incompatible: stream classdesc serialVersionUID = -8230073325803923865, local class serialVersionUID = -4336418598811571750
            at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:502)
            ... 1 more
    Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: com.testflink.mywordcount.WordCount$$anon$4; local class incompatible: stream classdesc serialVersionUID = -8230073325803923865, local class serialVersionUID = -4336418598811571750
            at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:286)
            at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1420)
            at org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.setup(SynchronousChainedCombineDriver.java:90)
            at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:77)
            at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1291)
            at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:284)
            at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86)
            ... 2 more
    Caused by: java.io.InvalidClassException: com.testflink.mywordcount.WordCount$$anon$4; local class incompatible: stream classdesc serialVersionUID = -8230073325803923865, local class serialVersionUID = -4336418598811571750
            at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
            at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
            at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
            at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:287)
            at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:245)
            at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
            ... 8 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题