如何在spark streaming上下文中调用stop()后继续批处理

ijxebb2r  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(418)

我们正在写一篇关于Spark批/流处理效率的论文。我们试图在大量数据中检测异常,我们需要的是哪条测井线经历了哪种过程。
因此,我们创建了一个事件的模拟,在每个进程之前和之后,我们记录行到达/离开该阶段的时间。
但我们面临的一个问题是,我们不希望在这些计算中包含我们分析流处理的时间。所以我们基本上需要的是
使用流式处理进行一些计算,调用 ssc.stop(false,true) (通过http或检测文件结尾),继续处理有关性能的分析
但是spark的问题是,它不允许我们在调用stop之后处理数据流。有没有办法复制上一个数据流,以便调用后可以访问它的对象 stop() ?
尝试执行此操作时出现的错误是:

Exception in thread "main" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported

代码模式基本上是这样的:

val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[" + CPUNumber + "]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    //Some ml algorithms
    val x = b.map(something)
    ssc.start()

    ssc.awaitTermination()
    ssc.stop(false)

    //Some analytical tracking map reduce jobs
    val y = x.map(getanalytics)

提前谢谢,任何想法都非常感谢

jum4pzuy

jum4pzuy1#

你可以这样做:

b.foreachRDD(r -> stopwatch = Stopwatch.createStarted())
b.map(something)
b.foreachRDD(r -> System.out.println(stopwatch)
ssc.start()

这样你就可以在每轮比赛中都很费时了。

相关问题