scala:java.lang.classcastexception:无法将java.lang.invoke.serializedlambda的示例分配给scala.function1类型的字段child1.myfun

2skhul33  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(521)

我想将函数名作为父类中的参数,以便子类可以设置它。此变量将用于父类的方法之一。

abstract class Parent[T: TypeInformation] {
   val myfun: T => Unit

   // A different method uses myfun
}

class Child1 extends Parent[User] {
   val service = new Service()

   val myfun: User => Unit = service.callme
}

class Service {
   def callme(user: User) => Unit = {
      println("We are here for user")
   }
}

我对斯卡拉还不熟悉,但看起来还不错。虽然编译器没有抱怨,但我遇到运行时异常,作业无法启动:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field Child1.myfun of type scala.Function1 in instance of Child1
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
    ... 11 more

知道如何在child中将函数名设置为变量值以便parent可以使用它吗?

voj3qocg

voj3qocg1#

你说的是传递函数名,但传递的是实际函数本身。看来这在Flink的连载中是行不通的。
另一种编码可能根本不使用函数值,而是使用普通的旧方法:

abstract class Parent[T: TypeInformation] {
   def myfun(t: T): Unit

   // A different method uses myfun
}

class Child1 extends Parent[User] {
   val service = new Service()

   def myfun(t: User): Unit = service.callme(t)
}

class Service {
   def callme(user: User) => Unit = {
      println("We are here for user")
   }
}

如果这不是现成的,它至少应该给你一个更清晰的错误信息(可能是关于 Service 在类路径上。
解决这个问题可能会让你回到以前的编码

相关问题