如何使用文件存储架构反序列化来自事件中心的avro消息,并将其存储在azure databricks的Dataframe中

bprjcwpo  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(210)

来自事件中心的数据是二进制/不可读的avro格式。需要在特定模式中转换为可读格式。
下面是尝试的代码:

%scala

// Configuration parameters for connecting to Event Hubs.
// Build connection string with the above information 

val connectionString =  ConnectionStringBuilder("our connection string).setEventHubName(EventHubName).build

val customEventhubParameters = 
  EventHubsConf(connectionString)
  .setMaxEventsPerTrigger(1)

val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

val messages = incomingStream.withColumn("Body", $"body".cast(StringType)).select("Body")

需要读入用户定义的.avsc模式并将其存储在Dataframe中以供以后处理

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题