如何在sparkstreaming中停止directstream后保存数据以供以后处理?

vlju58qv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(359)

我在Kafka迪的下面创作。

val messages = KafkaUtils.createDirectStream[String, String](
         ssc,
         LocationStrategies.PreferConsistent,
         ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

然后将值另存为:

val lines = messages.map(_.value)

然后,当我没有更多的偏移量可供使用时,停止流式处理上下文,如下所示:

lines.foreachRDD(rdd => {
      if(rdd.isEmpty()) {
        messages.stop()
        ssc.stop(false)
      } else {

      }
  })

然后我打印 lines 具体如下:

lines.print()

然后我开始流为:

ssc.start()

它工作得很好。它读取RDD并打印前10条,停止消息流并停止流上下文。但当我执行同一行时 lines.print() 它抛出一个异常,表示在停止streamingcontext之后不能执行新的输入、转换或输出。
我如何实现我的目标?我在spark shell中运行它,而不是作为二进制文件(强制要求)。
以下是我真正想要达到的目标:
1) 使用kafka主题中的所有json记录。
2) 停止获取更多记录(保证消费后,Kafka主题中不会添加新记录,所以不想继续处理no records。)
3) 通过从json字段中提取一些字段来进行一些预处理。
4) 对预处理后的数据做进一步的操作。
5) 完成。

5sxhfpxr

5sxhfpxr1#

当您再次调用“lines.print()”时,它将再次尝试调用转换“messages.map(\ u.value)”。当你停止上下文时,它就失败了。
通过在停止上下文之前执行操作来保存lines变量。

相关问题