我有一个结构化的流媒体应用程序,我想注册一个StreamingListener
,我该怎么做?我似乎只能通过sparkSession.streams.addListener()
注册StreamingQueryListener
。
我知道如何将StreamingListener
添加到StreamingContext
中,但问题似乎是在获取StreamingContext
时,如果我从SparkSession.sparkContext
创建StreamingContext
,我认为如果我继续从SparkSession
而不是我创建的StreamingContext
进行流式传输,StreamingListener
将无法工作。
下面是我的StreamingListener
类:
import io.prometheus.client.{CollectorRegistry, Gauge}
import io.prometheus.client.exporter.PushGateway
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
class MicroBatchStatsListener(val pushGateway: PushGateway, jobName: String) extends StreamingListener {
private val processingTimeGauge = Gauge.build()
.name("processingTimeGauge")
.help("Time it took to process this microbatch")
.register(CollectorRegistry.defaultRegistry)
private val schedulingDelayGauge = Gauge.build()
.name("schedulingDelayGauge")
.help("Scheduling delay of this microbatch")
.register(CollectorRegistry.defaultRegistry)
private val numRecordsGauge = Gauge.build()
.name("numRecordsGauge")
.help("Number of records received in this mircobatch")
.register(CollectorRegistry.defaultRegistry)
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val processingTime = if (batchCompleted.batchInfo.processingDelay.isDefined) {
batchCompleted.batchInfo.processingDelay.get / 1000
} else {
0
}
val schedulingDelay = if (batchCompleted.batchInfo.schedulingDelay.isDefined) {
batchCompleted.batchInfo.schedulingDelay.get / 1000
} else {
0
}
val numRecords = batchCompleted.batchInfo.numRecords
processingTimeGauge.set(processingTime)
schedulingDelayGauge.set(schedulingDelay)
numRecordsGauge.set(numRecords)
pushGateway.push(CollectorRegistry.defaultRegistry, jobName)
}
}
如果我不能将它连接到SparkSession
,那么我如何将它连接到StreamingQueryListener
?我不明白如何从那里获得这些指标:
1.调度延迟
1.处理时间
1条答案
按热度按时间kmpatx3s1#
事实证明,不可能为结构化流注册
StreamingListener
,因为如果您使用结构化流,StreamingContext
不可用。获取StreamingContext
的This方法在这种情况下不起作用,因为它创建了一个单独的StreamingContext
,如果您使用它注册侦听器,它们将无法像您使用结构化流一样工作。但是所有这些指标,实际上都可以用
StreamingQueryListener
计算,它具有的指标的示例如下:例如,我们可以将
durationMs
中的所有值相加,得到总处理时间。如果您知道该Map中提到的操作顺序,则也可以从
durationMs
计算调度延迟。所以,你必须把所有在triggerExecution
之前发生的操作加起来,你就会得到你的调度延迟。输入记录的数量也在那里。
所有这些信息都可以在
StreamingQueryListener
的onQueryProgress
方法的StreamingQueryListener.QueryProgressEvent
类中找到,该方法在每个微批处理后触发。更新:发现了类似的问题,并在此提供了答案:结构化Spark流度量检索