Kafka Spark Streaming不执行foreach中的代码行

lmvvr0a8  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(106)

关于Spark Streaming的快速问题。
我将KafkaUtils中的DirectStream初始化为stream,并将其保存为spark-streaming中的InputDStream,如下所示。

val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      context,
      PreferConsistent,
      Subscribe[String, String](topicList, KafkaParameters)
    )

  stream
      .foreachRDD { rdd =>
        println("Executing within rdd")
        val rddSize = rdd.count()
        if (rddSize > 0) { println(s"Received data $rddSize") }
        else {println("Not received any data")}
      }

  context.start()

我可以看到近50分钟的生产数据的输出。50分钟后,我可以在日志中看到以下消息,

Seeking to LATEST offset of partition topic_name-partition_number
Resetting offset for partition topic_name-partition_number to offset 12908.

但是没有更多的日志说“在rdd内执行”或“接收到的数据$rddSize”或“没有接收到任何数据”
当我启动消费者时,整个逻辑工作得很好,但过了一段时间,它就停止工作了。你知道这是怎么回事吗

eoigrqb6

eoigrqb61#

您没有显示KafkaParameters,但任何Kafka消费者都默认设置auto.offset.reset=latest,这意味着没有任何可消费的偏移量。正如你的日志所说,消费者正在重新设置并再次寻找主题的结尾。您需要一个正在运行的生产者来查看该配置的任何数据
此外,Spark Streaming(这不是Kafka Steams)已被弃用,Spark结构化流的所有功能都类似地工作,至少对于Kafka来说是这样。

相关问题