无法解决错误:java.io.notserializableexception:org.apache.avro.schema$recordschema

uplii1fm  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(815)

我试图通过sparksession从表中读取数据,并将其发布到kafka主题中。使用以下代码:

import org.apache.avro.Schema
    import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
    import org.apache.avro.specific.SpecificDatumWriter
    import org.apache.avro.io._
    import org.apache.kafka.clients.CommonClientConfigs
    import org.apache.kafka.clients.producer._
    import org.apache.kafka.common.serialization.StringSerializer
    import org.apache.kafka.common.serialization.ByteArraySerializer
    import java.io.{ByteArrayOutputStream, StringWriter} 

object Producer extends Serializable {

  def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)

        val lines= Source.fromFile("file")
        val schema = new Schema.Parser().parse(lines)

        val spark = new SparkSession.Builder().enableHiveSupport() getOrCreate()

        import spark.implicits._
        val df = spark.sql("select * from table")

        df.rdd.map{
            value => {
              val prod = new KafkaProducer[String, Array[Byte]](props)

        val records = new GenericData.Record(schema)
              records.put("col1",value.getString(1))
              records.put("col2",value.getString(2))
              records.put("col3",value.getString(3))
              records.put("col4",value.getString(4))

        val writer = new SpecificDatumWriter[GenericRecord](schema)
              val out = new ByteArrayOutputStream()
              val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
              writer.write(records, encoder)
              encoder.flush()
              out.close()

        val serializedBytes: Array[Byte] = out.toByteArray()
        val record = new ProducerRecord("topic",col1.toString , serializedBytes)
        val data = prod.send(record)

        prod.flush()
        prod.close() }  
                  }

        spark.close()
     }
}

执行时会抛出以下错误:
引起原因:java.io.notserializableeexception:org.apache.avro.schema$recordschema序列化堆栈:-对象不可序列化(类:org.apache.avro.schema$recordschema,值:{“type”:“record”,“name”:“data”,“namespace”:“com.data.record”,“fields”:[{“name”:“col1”,“type”:“string”},{“name”:“col2”,“type”:“string”},{“name”:“col3”,“type”:“string”},{“name”:“col4”,“type”:“string”}]})
field(类:scala.runtime.objectref,名称:elem,类型:class java.lang.object)object(类scala.runtime.objectref,{“type”:“record”,“name”:“data”,“namespace”:“com.data.record”,“fields”:[{“name”:“col1”,“type”:“string”},{“name”:“col2”,“type”:“string”},{“name”:“col3”,“type”:“string”},{“name”:“col4”,“type”:“string”}]})-字段(类:com.kafka.driver.kafkaproducer.producer$$anonfun$main$1,名称:schema$1,类型:class scala.runtime.objectref)
但是,当我尝试使用df.rdd.collect.foreach将数据集传递给驱动程序时,它运行良好。相反,我需要在集群级别发布消息,从而使用rdd.map。我不确定我到底错过了什么导致了这个错误。任何帮助解决这个问题都将不胜感激,谢谢!

xmq68pz9

xmq68pz91#

发现对象、模式和Kafka生产者需要向执行者公开。为此,将上述代码修改为:

import org.apache.avro.Schema
    import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
    import org.apache.avro.specific.SpecificDatumWriter
    import org.apache.avro.io._
    import org.apache.kafka.clients.CommonClientConfigs
    import org.apache.kafka.clients.producer._
    import org.apache.kafka.common.serialization.StringSerializer
    import org.apache.kafka.common.serialization.ByteArraySerializer
    import java.io.{ByteArrayOutputStream, StringWriter} 

object Producer extends Serializable {

  def main(args: Array[String]): Unit = {
        val props = new Properties()
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)

        val spark = new SparkSession.Builder().enableHiveSupport() getOrCreate()

        import spark.implicits._
        val df = spark.sql("select * from table")

        df.foreachPartition{
            rows => {
              val prod = new KafkaProducer[String, Array[Byte]](props)
              val lines= Source.fromFile("file")
              val schema = new Schema.Parser().parse(lines)
            rows.foreach{
                  value =>
                      val records = new GenericData.Record(schema)
                      records.put("col1",value.getString(1))
                      records.put("col2",value.getString(2))
                      records.put("col3",value.getString(3))
                      records.put("col4",value.getString(4))

                      val writer = new SpecificDatumWriter[GenericRecord](schema)
                      val out = new ByteArrayOutputStream()
                      val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
                      writer.write(records, encoder)
                      encoder.flush()
                      out.close()

                      val serializedBytes: Array[Byte] = out.toByteArray()
                      val record = new ProducerRecord("topic",col1.toString , serializedBytes)
                      val data = prod.send(record)
                     }
        prod.flush()
        prod.close() 
                 }  
               }

        spark.close()
     }
}

相关问题