我们正在写一篇关于Spark批/流处理效率的论文。我们试图在大量数据中检测异常,我们需要的是哪条测井线经历了哪种过程。
因此,我们创建了一个事件的模拟,在每个进程之前和之后,我们记录行到达/离开该阶段的时间。
但我们面临的一个问题是,我们不希望在这些计算中包含我们分析流处理的时间。所以我们基本上需要的是
使用流式处理进行一些计算,调用 ssc.stop(false,true)
(通过http或检测文件结尾),继续处理有关性能的分析
但是spark的问题是,它不允许我们在调用stop之后处理数据流。有没有办法复制上一个数据流,以便调用后可以访问它的对象 stop()
?
尝试执行此操作时出现的错误是:
Exception in thread "main" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported
代码模式基本上是这样的:
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[" + CPUNumber + "]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
//Some ml algorithms
val x = b.map(something)
ssc.start()
ssc.awaitTermination()
ssc.stop(false)
//Some analytical tracking map reduce jobs
val y = x.map(getanalytics)
提前谢谢,任何想法都非常感谢
1条答案
按热度按时间jum4pzuy1#
你可以这样做:
这样你就可以在每轮比赛中都很费时了。