来自事件中心的数据是二进制/不可读的avro格式。需要在特定模式中转换为可读格式。
下面是尝试的代码:
%scala
// Configuration parameters for connecting to Event Hubs.
// Build connection string with the above information
val connectionString = ConnectionStringBuilder("our connection string).setEventHubName(EventHubName).build
val customEventhubParameters =
EventHubsConf(connectionString)
.setMaxEventsPerTrigger(1)
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
val messages = incomingStream.withColumn("Body", $"body".cast(StringType)).select("Body")
需要读入用户定义的.avsc模式并将其存储在Dataframe中以供以后处理
暂无答案!
目前还没有任何答案,快来回答吧!