我试图通过sparksession从表中读取数据,并将其发布到kafka主题中。使用以下代码:
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.io._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.serialization.ByteArraySerializer
import java.io.{ByteArrayOutputStream, StringWriter}
object Producer extends Serializable {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
val lines= Source.fromFile("file")
val schema = new Schema.Parser().parse(lines)
val spark = new SparkSession.Builder().enableHiveSupport() getOrCreate()
import spark.implicits._
val df = spark.sql("select * from table")
df.rdd.map{
value => {
val prod = new KafkaProducer[String, Array[Byte]](props)
val records = new GenericData.Record(schema)
records.put("col1",value.getString(1))
records.put("col2",value.getString(2))
records.put("col3",value.getString(3))
records.put("col4",value.getString(4))
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(records, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val record = new ProducerRecord("topic",col1.toString , serializedBytes)
val data = prod.send(record)
prod.flush()
prod.close() }
}
spark.close()
}
}
执行时会抛出以下错误:
引起原因:java.io.notserializableeexception:org.apache.avro.schema$recordschema序列化堆栈:-对象不可序列化(类:org.apache.avro.schema$recordschema,值:{“type”:“record”,“name”:“data”,“namespace”:“com.data.record”,“fields”:[{“name”:“col1”,“type”:“string”},{“name”:“col2”,“type”:“string”},{“name”:“col3”,“type”:“string”},{“name”:“col4”,“type”:“string”}]})
field(类:scala.runtime.objectref,名称:elem,类型:class java.lang.object)object(类scala.runtime.objectref,{“type”:“record”,“name”:“data”,“namespace”:“com.data.record”,“fields”:[{“name”:“col1”,“type”:“string”},{“name”:“col2”,“type”:“string”},{“name”:“col3”,“type”:“string”},{“name”:“col4”,“type”:“string”}]})-字段(类:com.kafka.driver.kafkaproducer.producer$$anonfun$main$1,名称:schema$1,类型:class scala.runtime.objectref)
但是,当我尝试使用df.rdd.collect.foreach将数据集传递给驱动程序时,它运行良好。相反,我需要在集群级别发布消息,从而使用rdd.map。我不确定我到底错过了什么导致了这个错误。任何帮助解决这个问题都将不胜感激,谢谢!
1条答案
按热度按时间xmq68pz91#
发现对象、模式和Kafka生产者需要向执行者公开。为此,将上述代码修改为: