我是spark structure streaming和azure event hub的新手,并编写了以下代码:
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
import spark.implicits._
val body=incomingStream.select(incomingStream.col("body").cast("String"))
val query=body.writeStream.format("memory").outputMode("append").option("checkpointLocation",fileLocation+checkpointFolder).queryName("Body").start()
val data=spark.sqlContext.sql("select * from Body").map(x=>x(0).toString())
val result=spark.sqlContext.read.json(data)
result.write
.partitionBy(partitionColumn)
.option("path",fileLocation+eventHubName)
query.awaitTermination()
我无法获取val数据变量中的数据,也无法将其写入azure数据湖。我可以在日志中看到writestream正在内存中创建表 Body
但输出中没有任何内容。有人能看出我做错了什么吗?
暂无答案!
目前还没有任何答案,快来回答吧!