我从Databricks delta table中阅读数据作为流并将其写入另一个delta table(使用屏幕截图中的控制台以便于调试),我想利用spark的StreamingQueryListener()并使用onQueryProgress()打印代码片段中批次的输入行以进行调试。不知道我错过了什么!
这让我想到了这个功能是否只适用于Kafka,但是我使用Kafka源代码也得到了同样的结果。
%scala
import org.apache.spark.sql.streaming._
val streamingCountsListener = new StreamingQueryListener() {
override def onQueryStarted(queryStarted: StreamingQueryListener.QueryStartedEvent): Unit = {
println("query started")
}
override def onQueryTerminated(queryTerminated: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println("query made stopped")
}
override def onQueryProgress(queryProgress: StreamingQueryListener.QueryProgressEvent): Unit = {
queryProgress.progress.sources.foreach(src => {
println(src.numInputRows)
println("\n\n\n\n")
})
}
}
// Add this query listener to the session
spark.streams.addListener(streamingCountsListener)
var x = spark.readStream.format("delta")
.option("ignoreChanges", "true")
.table(s"qastg.student")
val query = x.writeStream
.format("console")
.outputMode("append")
.start()
query.awaitTermination()
Code Snippet
尝试使用Kafka源代码并使用简单的print语句进行调试,但不起作用
1条答案
按热度按时间uelo1irk1#
将事件传递给查询侦听器: