spark流式数据集

c2e8gylq  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(219)

我对databricks还比较陌生,正在尝试读取传入的传感器数据,并在每个数据集中触发一个规则集。寻求一些帮助和指导如何进一步进行

val connectionString = ConnectionStringBuilder("ConnectionString")   
    .setEventHubName("EventHubname")
    .build

  val eventHubsConf = EventHubsConf(connectionString)
    .setStartingPosition(EventPosition.fromEndOfStream)
    .setConsumerGroup("azuredatabricks")

  val eventhubs = spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load() 

 /*
  val df = eventhubs.select(($"enqueuedTime").as("Enqueued_Time"),($"systemProperties.iothub- 
           connection-device-id") .as("Device_ID"),($"body".cast("string")).as("telemetry_json")) 

  df.createOrReplaceTempView("tel_table")

* /

上面的代码从eventhub读取传入的流数据,并在dataframe中加载数据(代码的注解部分)。现在我需要遍历每一行数据并将其传递给规则集。由于它是一个流数据,sparksql抛出错误。有没有人能指导一下如何按顺序读取这些流数据。
我寻求帮助的下一部分是如何从消息体推断模式。因为这些是传感器数据,所以模式不会是静态的。一条消息可以有设备ID、温度、ph值,另一条消息是设备、电压、电流。对于这样的情况,有一种方法可以在apachespark中动态推断模式。
非常感谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题