Spark流和消费Kafka头

jm2pwxwz  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(241)

我尝试在我的spark流媒体应用程序中读取kafka头(因为只有spark 3.0以后的结构化流媒体才支持读取头),我尝试使用dstream api。
然而,我得到的标题不是消息中的实际标题。
这是我正在使用的代码

try {
      val sparkStreamingSession = new StreamingContext(spark.sparkContext, Seconds(1))
      val kafkaParams: Map[String, Object] = Map(
        "bootstrap.servers" -> kafkaBrokers,
        "key.serializer" -> classOf[StringSerializer],
        "value.serializer" -> classOf[StringSerializer],
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "auto.offset.reset" -> "latest",
        "group.id" -> "my_streaming_client",
        "enable.auto.commit" -> false.asInstanceOf[Object]
      )
      val topics = Array(topicName)
      val kafkaDStream = KafkaUtils.createDirectStream(
        sparkStreamingSession,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )
      val processedStream = kafkaDStream.map(record => {
        val headers: Headers = record.headers()
        for (header <- headers.toArray) {
          println(".Header Name =" + header.key())
          println(".Header Value=" + new String(header.value()))
        }
      })
      processedStream.print()
      sparkStreamingSession.start()
      sparkStreamingSession.awaitTermination()
    }
    catch {
      case e: Exception => {
        e.printStackTrace()
        logger.error("Exception in convert ={}", e)
      }
      case unknown: Exception => {
        println(s"Unknown exception: $unknown")
      }
    }

这是我在日志里看到的。

.Header Name =task.generation
.Header Value=1
.Header Name =task.id
.Header Value=0
.Header Name =current.iteration
.Header Value=3955984

不知道Kafka消息的实际头发生了什么。
谢谢你的帮助。
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题