我们有一个spark流应用程序,它从接收器中的kafka队列读取数据,并进行一些转换和输出到hdfs。间歇时间是1分钟,我们已经调整了背压和压力 spark.streaming.receiver.maxRate
参数,所以大部分时间工作正常。
但我们还有一个问题。当hdfs完全关闭时,批处理作业将长时间挂起(假设hdfs没有工作4个小时,作业将挂起4个小时),但是接收器不知道作业没有完成,所以它仍然在接收接下来4个小时的数据。这导致oom异常,整个应用程序都关闭了,我们丢失了很多数据。
所以,我的问题是:有没有可能让接收者知道作业没有完成,这样它将接收到更少(甚至没有)的数据,而当作业完成时,它将开始接收更多的数据以赶上进度。在上述情况下,当hdfs关闭时,接收机将从kafka读取较少的数据,并且在未来4小时内生成的块非常小,接收机和整个应用程序没有关闭,在hdfs正常后,接收机将读取更多的数据并开始追赶。
1条答案
按热度按时间6za6bjd01#
可以通过设置属性来启用背压
spark.streaming.backpressure.enabled=true
. 这将动态地修改批处理大小,并将避免从队列构建中获得oom的情况。它有几个参数:spark.streaming.backpressure.pid.proportional-上一批大小错误的响应信号(默认值为1.0)
spark.streaming.backpressure.pid.integral-对累积误差的响应信号-有效地抑制(默认值为0.2)
spark.streaming.backpressure.pid.derived-对错误趋势的响应(用于快速响应更改,默认值为0.0)
spark.streaming.backpressure.pid.minrate-批处理频率暗示的最小速率,更改它以减少高吞吐量作业中的未及点(默认值100)
默认值很好,但我在这里模拟了算法对各种参数的响应