如何修复spark streaming kafka consumer中的“java.io.notserializableexception:org.apache.kafka.clients.consumer.consumerrecord”?

esbemjvw  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(689)

Spark2.0.0
apacheKafka0.10.1.0
scala 2.11.8版
当我使用spark streaming和kafka与kafka broker版本0.10.1.0的集成以及以下scala代码时,它失败了,出现以下异常:

16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

为什么?如何修复?
代码:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark._
import org.apache.commons.codec.StringDecoder
import org.apache.spark.streaming._

object KafkaConsumer_spark_test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("KafkaConsumer_spark_test").setMaster("local[4]")
    val ssc = new StreamingContext(conf, Seconds(1))
    ssc.checkpoint("./checkpoint")
    val kafkaParams =Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("local1")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    stream.map(record => (record.key, record.value))
    stream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

例外情况:

16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/11/13 12:55:20 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11); not retrying
16/11/13 12:55:20 ERROR JobScheduler: Error running job streaming job 1479012920000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122)
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122)
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
new9mtju

new9mtju1#

kafkautils.createdirectstream创建为org.apache.spark.streaming.dstream.dstream。它不是rdd。spark streaming将按原样临时创建RDD。要检索rdd,请使用stream.foreach()获取rdd,然后使用rdd.foreach获取rdd中的每个对象。这些将是kafka consumerrecords,您可以使用value()方法读取kafka主题中的消息:

stream.foreachRDD { rdd => 
    rdd.foreach { record => 
    val value = record.value()
    println(map.get(value)) 
    }
}
wfsdck30

wfsdck302#

当执行需要序列化的操作(如持久化或窗口、打印)时,consumerrecord不实现序列化。您需要添加以下配置以避免错误。

sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerialize");
    sparkConf.registerKryoClasses((Class<ConsumerRecord>[] )Arrays.asList(ConsumerRecord.class).toArray());
iyfamqjs

iyfamqjs3#

从dstream接收使用者记录对象。当您尝试打印它时,它会出错,因为该对象不可序列化。相反,您应该从consumerrecord对象获取值并打印它。
执行以下操作,而不是stream.print():

stream.map(record=>(record.value().toString)).print

这应该能解决你的问题。
明白了
对于其他看到此异常的人,任何调用 checkpoint 我会叫一个 persiststorageLevel = MEMORY_ONLY_SER ,所以不要打电话 checkpoint 直到你打电话来 map

相关问题