kryo序列化问题spark

bxjv4tth  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(315)

我试图注册下面的类在Spark代码kryo序列化,但我收到一个错误。
代码:

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
  lazy val producer = createProducer()
  def send(topic: String, key: String, value: String): Unit = producer.send(new ProducerRecord(topic, key, value))
}

object KafkaSink {
  def apply(config: Properties): KafkaSink = {
    val f = () => {
      val producer = new KafkaProducer[String, String](config)

      //close producer if VM exits
      sys.addShutdownHook {
        producer.close()
      }
      producer
    }
    new KafkaSink(f)
  }
}

错误:
原因:java.lang.illegalargumentexception:类未注册:com.test.kafkasink$$anonfun$1注意:要注册此类,请使用:kryo.register(com.test.kafkasink$$anonfun$1.class);在com.esotericsoftware.kryo.kryo.getregistration(kryo。java:488)在com.esotericsoftware.kryo.util.defaultclassresolver.writeclass(defaultclassresolver。java:97)在com.esotericsoftware.kryo.kryo.writeclass(kryo。java:517)在com.esotericsoftware.kryo.serializers.objectfield.write(objectfield。java:76) ... 14个以上
我试过用下面两种方法注册这个类,但都不起作用,并且给了我同样的错误

kryo.register(classOf[KafkaSink])
kryo.register(KafkaSink.getClass)

我怎样注册这个班?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题