我尝试从工作节点创建一个spark rdd流,而不是首先在驱动程序处收集它。所以我创建了以下代码
def writeToKafka[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], topic: String, keySerializerClass: String, valueSerializerClass: String, brokers: String = producerBroker) = {
rdd.foreachPartition { partitionOfRecords =>
val producer = new KafkaProducer[K, V](getProducerProps(keySerializerClass, valueSerializerClass, brokers))
partitionOfRecords.foreach { message =>
producer.send(new ProducerRecord[K, V](topic, message._1, message._2))
}
producer.close()
}
}
def getProducerProps(keySerializerClass: String, valueSerializerClass: String, brokers: String): Properties = {
val producerProps: Properties = new Properties
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass)
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass)
producerProps
}
运行此代码会导致以下异常
15/09/01 15:13:00 ERROR JobScheduler: Error running job streaming job 1441120380000 ms.3
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805)
at com.company.opt.detector.StreamingDetector.writeToKafka(StreamingDetector.scala:84)
at com.company.opt.MyClass.MyClass$$anonfun$doStreamingWork$3.apply(MyClass.scala:47)
at com.company.opt.MyClass.MyClass$$anonfun$doStreamingWork$3.apply(MyClass.scala:47)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.company.opt.MyClass.MyClass$
Serialization stack:
- object not serializable (class: com.company.opt.MyClass.MyClass$, value: com.company.opt.MyClass.MyClass$@7e2bb5e0)
- field (class: com.company.opt.detector.StreamingDetector$$anonfun$writeToKafka$1, name: $outer, type: class com.company.opt.detector.StreamingDetector)
- object (class com.company.opt.detector.StreamingDetector$$anonfun$writeToKafka$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 21 more
但是当我从 getProducerProps
功能直接进入我的 writeToKafka
功能如下,一切正常。
def writeToKafka[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], topic: String, keySerializerClass: String, valueSerializerClass: String, brokers: String = producerBroker) = {
rdd.foreachPartition { partitionOfRecords =>
val producerProps: Properties = new Properties
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass)
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass)
val producer = new KafkaProducer[K, V](producerProps)
partitionOfRecords.foreach { message =>
producer.send(new ProducerRecord[K, V](topic, message._1, message._2))
}
producer.close()
}
}
有人能解释为什么会这样吗?谢谢
2条答案
按热度按时间ftf50wuq1#
我同意maasg的回答,也许你会发现这篇文章很有趣,它探讨了如何确保闭包中的哪些数据是由spark序列化的
rdlzhqv92#
鉴于此
getProducerProps
是封闭它的类的方法,当从闭包使用它时,它等价于dothis.getProducerProps(...)
. 然后问题变得明显:this
正在拉入闭包,需要与所有其他字段一起序列化。该类的某些成员不可序列化,并给出了此异常。一个好的做法是将这些方法作为一个单独的对象:
另一种方法是使该方法成为函数并将其赋给
val
. 那么,它的价值val
因此,不会将整个示例拉入可序列化闭包: