我试图注册下面的类在Spark代码kryo序列化,但我收到一个错误。
代码:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key: String, value: String): Unit = producer.send(new ProducerRecord(topic, key, value))
}
object KafkaSink {
def apply(config: Properties): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
//close producer if VM exits
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}
}
错误:
原因:java.lang.illegalargumentexception:类未注册:com.test.kafkasink$$anonfun$1注意:要注册此类,请使用:kryo.register(com.test.kafkasink$$anonfun$1.class);在com.esotericsoftware.kryo.kryo.getregistration(kryo。java:488)在com.esotericsoftware.kryo.util.defaultclassresolver.writeclass(defaultclassresolver。java:97)在com.esotericsoftware.kryo.kryo.writeclass(kryo。java:517)在com.esotericsoftware.kryo.serializers.objectfield.write(objectfield。java:76) ... 14个以上
我试过用下面两种方法注册这个类,但都不起作用,并且给了我同样的错误
kryo.register(classOf[KafkaSink])
kryo.register(KafkaSink.getClass)
我怎样注册这个班?
暂无答案!
目前还没有任何答案,快来回答吧!