具有spark结构的azure eventhub流数据

qnyhuwrf  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(269)

我是spark structure streaming和azure event hub的新手,并编写了以下代码:

val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
import spark.implicits._

val body=incomingStream.select(incomingStream.col("body").cast("String"))
val query=body.writeStream.format("memory").outputMode("append").option("checkpointLocation",fileLocation+checkpointFolder).queryName("Body").start()
val data=spark.sqlContext.sql("select * from Body").map(x=>x(0).toString())

val result=spark.sqlContext.read.json(data)
result.write
  .partitionBy(partitionColumn)
  .option("path",fileLocation+eventHubName)

query.awaitTermination()

我无法获取val数据变量中的数据,也无法将其写入azure数据湖。我可以在日志中看到writestream正在内存中创建表 Body 但输出中没有任何内容。有人能看出我做错了什么吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题