我想将函数名作为父类中的参数,以便子类可以设置它。此变量将用于父类的方法之一。
abstract class Parent[T: TypeInformation] {
val myfun: T => Unit
// A different method uses myfun
}
class Child1 extends Parent[User] {
val service = new Service()
val myfun: User => Unit = service.callme
}
class Service {
def callme(user: User) => Unit = {
println("We are here for user")
}
}
我对斯卡拉还不熟悉,但看起来还不错。虽然编译器没有抱怨,但我遇到运行时异常,作业无法启动:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field Child1.myfun of type scala.Function1 in instance of Child1
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
... 11 more
知道如何在child中将函数名设置为变量值以便parent可以使用它吗?
1条答案
按热度按时间voj3qocg1#
你说的是传递函数名,但传递的是实际函数本身。看来这在Flink的连载中是行不通的。
另一种编码可能根本不使用函数值,而是使用普通的旧方法:
如果这不是现成的,它至少应该给你一个更清晰的错误信息(可能是关于
Service
在类路径上。解决这个问题可能会让你回到以前的编码