spark-stream kafka-stream-returned rdd在foreachrdd中没有任何作用

ykejflvf  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(335)

我在这里面临一个奇怪的问题,我正在阅读 Avro 从Kafka的记录,并试图反序列化它,并将其存储到一个文件中。我可以从Kafka那里得到一些记录,但是当我试图在rdd记录上使用一个函数时,它拒绝做任何事情

import java.util.UUID
import io.confluent.kafka.serializers.KafkaAvroDecoder
import com.my.project.avro.AvroDeserializer
import com.my.project.util.SparkJobLogging
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka._
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.{DStream}

object KafkaConsumer extends SparkJobLogging {
  var schemaRegistry: SchemaRegistryClient = null
  val url="url:8181"
  schemaRegistry= new CachedSchemaRegistryClient(url, 1000)

  def createKafkaStream(ssc: StreamingContext): DStream[(String,Array[Byte])] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> "zk.server:2181",
      "group.id" -> s"${UUID.randomUUID().toString}",
      "auto.offset.reset" -> "smallest",
      "bootstrap.servers" -> "bootstrap.server:9092",
      "zookeeper.connection.timeout.ms" -> "6000",
      "schema.registry.url" ->"registry.url:8181"
    )

    val topic = "my.topic"
    KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Set(topic))
  }

   def processRecord(  avroStream: Array[Byte])={
    println(AvroDeserializer.toRecord(avroStream, schemaRegistry) )
  }

  def main(args: Array[String]) = {
    val sparkConf = new SparkConf().setAppName("AvroDeserilizer")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val topicStream = createKafkaStream(ssc)map(_._2)
    topicStream.foreachRDD(
      rdd => if (!rdd.isEmpty()){
        logger.info(rdd.count())
        rdd.foreach(avroRecords=> processRecord(avroRecords))
      }
    )
    ssc.start()
    ssc.awaitTermination()
  }
}

object AvroDeserializer extends SparkJobLogging{
  def toRecord(buffer: Array[Byte], registry: SchemaRegistryClient): GenericRecord = {
    val bb = ByteBuffer.wrap(buffer)
    bb.get() // consume MAGIC_BYTE
    val schemaId = bb.getInt // consume schemaId
    val schema = registry.getByID(schemaId) // consult the Schema Registry
    val reader = new GenericDatumReader[GenericRecord](schema)
    val decoder = DecoderFactory.get().binaryDecoder(buffer, bb.position(), bb.remaining(), null)
    reader.read(null, decoder) //null -> as we are not providing any datum
  }
}

till语句 logger.info(rdd.count()) 一切正常,我在日志中看到了准确的记录。然而在那之后什么都不起作用。当我累的时候

val record= rdd.first()
processRecord(record)

成功了,但是 rdd.foreach(avroRecords=> processRecord(avroRecords)) 以及 rdd.map(avroRecords=> processRecord(avroRecords)) 不起作用。每次流式通话都会打印以下内容:

17/05/14 01:01:24 INFO scheduler.DAGScheduler: Job 2 finished: foreach at KafkaConsumer.scala:56, took 42.684999 s
 17/05/14 01:01:24 INFO scheduler.JobScheduler: Finished job streaming job 1494738000000 ms.0 from job set of time 1494738000000 ms
 17/05/14 01:01:24 INFO scheduler.JobScheduler: Total delay: 84.888 s for time 1494738000000 ms (execution: 84.719 s)
 17/05/14 01:01:24 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
 17/05/14 01:01:24 INFO scheduler.InputInfoTracker: remove old batch metadata: 
 17/05/14 01:01:26 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
 17/05/14 01:01:26 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
 17/05/14 01:01:29 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
 17/05/14 01:01:29 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.

它只打印日志中的最后两行,直到下一个流上下文调用。

j5fpnvbx

j5fpnvbx1#

虽然上面的方法对我不起作用,但我在合流文档中找到了一种不同的方法。这个 KafkaAvroDecoder 将与模式注册表通信,获取模式并反序列化数据。因此,它消除了自定义反序列化程序的需要。

import io.confluent.kafka.serializers.KafkaAvroDecoder

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
  "schema.registry.url" -> schemaRegistry,
  "key.converter.schema.registry.url" -> schemaRegistry,
  "value.converter.schema.registry.url" -> schemaRegistry,
  "auto.offset.reset" -> "smallest")
val topicSet = Set(topics)
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicSet).map(_._2)
messages.foreachRDD { 
rdd => if (!rdd.isEmpty()){
    logger.info(rdd.count())
    rdd.saveAsTextFile("/data/")
  }
)
ssc.start()
ssc.awaitTermination()
}
}

依赖项jar: kafka-avro-serializer-3.1.1.jar . 这是完美的工作,我现在和我希望这将有助于在未来的人。

kxe2p93d

kxe2p93d2#

你的 println 语句正在不在当前进程中的分布式worker上运行,因此您看不到它们。你可以试着换一个 printlnlog.info 核实一下。
理想情况下你应该 DStream[Array[Byte]]DStream[GenericRecord] 并将其写入文件,使用 .saveAsTextFiles 或者别的什么。你可能需要一个 stream.take() 因为水流可能是无限的。
http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-数据流上的操作

sg24os4d

sg24os4d3#

val topicStream = createKafkaStream(ssc)map(_._2)
topicStream.foreachRDD(
  rdd => if (!rdd.isEmpty()){
    logger.info(rdd.count())
    rdd.foreach(avroRecords=> processRecord(avroRecords))

foreachrdd是一个强大的原语,允许将数据发送到外部系统。然而,了解如何正确有效地使用这个原语是很重要的。要避免的一些常见错误如下。
数据流由输出操作延迟执行,就像rdd由rdd操作延迟执行一样。具体来说,dstream输出操作中的rdd操作强制处理接收到的数据。因此,如果应用程序没有任何输出操作,或者像dstream.foreachrdd()这样的输出操作中没有任何rdd操作,那么就不会执行任何操作。系统只需接收数据并丢弃它。

相关问题