我是新来spark的,我有一个问题。我有Spark流应用程序使用Kafka。如果新批处理为空(假设batchduration=15分钟),有没有方法告诉我的应用程序关闭?
3pmvbmvn1#
我们应该做的是:
dstream.foreachRDD{rdd => if (rdd.isEmpty) { streamingContext.stop() } }
但是请注意,根据应用程序工作流的不同,第一批(或中间的某批)也可能是空的,因此您的作业将在第一次运行时停止。您可能需要结合一些条件以获得更可靠的停止。
1条答案
按热度按时间3pmvbmvn1#
我们应该做的是:
但是请注意,根据应用程序工作流的不同,第一批(或中间的某批)也可能是空的,因此您的作业将在第一次运行时停止。您可能需要结合一些条件以获得更可靠的停止。