Spark中的StreamQueryListener不执行onQueryProgress()中的代码

fcg9iug3  于 2023-04-21  发布在  Apache
关注(0)|答案(1)|浏览(112)

我从Databricks delta table中阅读数据作为流并将其写入另一个delta table(使用屏幕截图中的控制台以便于调试),我想利用spark的StreamingQueryListener()并使用onQueryProgress()打印代码片段中批次的输入行以进行调试。不知道我错过了什么!
这让我想到了这个功能是否只适用于Kafka,但是我使用Kafka源代码也得到了同样的结果。

%scala
import org.apache.spark.sql.streaming._
val streamingCountsListener = new StreamingQueryListener() {
  override def onQueryStarted(queryStarted: StreamingQueryListener.QueryStartedEvent): Unit = {
      println("query started")
  }
  override def onQueryTerminated(queryTerminated: StreamingQueryListener.QueryTerminatedEvent): Unit = {
        println("query made stopped")
      
  }
  override def onQueryProgress(queryProgress: StreamingQueryListener.QueryProgressEvent): Unit = {
    queryProgress.progress.sources.foreach(src => {
      println(src.numInputRows)
      println("\n\n\n\n")
    })
  }
}

// Add this query listener to the session
spark.streams.addListener(streamingCountsListener)

var x = spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .table(s"qastg.student")

val query = x.writeStream
  .format("console")
  .outputMode("append")
  .start()

query.awaitTermination()

Code Snippet
尝试使用Kafka源代码并使用简单的print语句进行调试,但不起作用

uelo1irk

uelo1irk1#

将事件传递给查询侦听器:

queryStarted: StreamingQueryListener.QueryStartedEvent queryStarted

相关问题