如何使用SparkSession注册StreamingListener?

x4shl7ld  于 2023-06-24  发布在  Apache
关注(0)|答案(1)|浏览(112)

我有一个结构化的流媒体应用程序,我想注册一个StreamingListener,我该怎么做?我似乎只能通过sparkSession.streams.addListener()注册StreamingQueryListener
我知道如何将StreamingListener添加到StreamingContext中,但问题似乎是在获取StreamingContext时,如果我从SparkSession.sparkContext创建StreamingContext,我认为如果我继续从SparkSession而不是我创建的StreamingContext进行流式传输,StreamingListener将无法工作。
下面是我的StreamingListener类:

import io.prometheus.client.{CollectorRegistry, Gauge}
import io.prometheus.client.exporter.PushGateway
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}

class MicroBatchStatsListener(val pushGateway: PushGateway, jobName: String) extends StreamingListener {

  private val processingTimeGauge = Gauge.build()
    .name("processingTimeGauge")
    .help("Time it took to process this microbatch")
    .register(CollectorRegistry.defaultRegistry)

  private val schedulingDelayGauge = Gauge.build()
    .name("schedulingDelayGauge")
    .help("Scheduling delay of this microbatch")
    .register(CollectorRegistry.defaultRegistry)

  private val numRecordsGauge = Gauge.build()
    .name("numRecordsGauge")
    .help("Number of records received in this mircobatch")
    .register(CollectorRegistry.defaultRegistry)

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val processingTime = if (batchCompleted.batchInfo.processingDelay.isDefined) {
      batchCompleted.batchInfo.processingDelay.get / 1000
    } else {
      0
    }

    val schedulingDelay = if (batchCompleted.batchInfo.schedulingDelay.isDefined) {
      batchCompleted.batchInfo.schedulingDelay.get / 1000
    } else {
      0
    }

    val numRecords = batchCompleted.batchInfo.numRecords

    processingTimeGauge.set(processingTime)
    schedulingDelayGauge.set(schedulingDelay)
    numRecordsGauge.set(numRecords)

    pushGateway.push(CollectorRegistry.defaultRegistry, jobName)
  }
}

如果我不能将它连接到SparkSession,那么我如何将它连接到StreamingQueryListener?我不明白如何从那里获得这些指标:
1.调度延迟
1.处理时间

kmpatx3s

kmpatx3s1#

事实证明,不可能为结构化流注册StreamingListener,因为如果您使用结构化流,StreamingContext不可用。获取StreamingContextThis方法在这种情况下不起作用,因为它创建了一个单独的StreamingContext,如果您使用它注册侦听器,它们将无法像您使用结构化流一样工作。
但是所有这些指标,实际上都可以用StreamingQueryListener计算,它具有的指标的示例如下:

23/06/20 17:59:15 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "4005e0e1-de55-4e9e-a01f-2bd3eec3d77e",
  "runId" : "9d847913-72f8-4410-8b5f-0c69b798bc3c",
  "name" : null,
  "timestamp" : "2023-06-20T14:59:15.184Z",
  "batchId" : 3,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 666.6666666666667,
  "processedRowsPerSecond" : 47.16981132075472,
  "durationMs" : {
    "addBatch" : 31,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 4,
    "setOffsetRange" : 0,
    "triggerExecution" : 212,
    "walCommit" : 112
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 2,
    "endOffset" : 3,
    "numInputRows" : 10,
    "inputRowsPerSecond" : 666.6666666666667,
    "processedRowsPerSecond" : 47.16981132075472
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@3d192c1a"
  }
}

例如,我们可以将durationMs中的所有值相加,得到总处理时间。
如果您知道该Map中提到的操作顺序,则也可以从durationMs计算调度延迟。所以,你必须把所有在triggerExecution之前发生的操作加起来,你就会得到你的调度延迟。
输入记录的数量也在那里。
所有这些信息都可以在StreamingQueryListeneronQueryProgress方法的StreamingQueryListener.QueryProgressEvent类中找到,该方法在每个微批处理后触发。

更新:发现了类似的问题,并在此提供了答案:结构化Spark流度量检索

相关问题