我对databricks还比较陌生,正在尝试读取传入的传感器数据,并在每个数据集中触发一个规则集。寻求一些帮助和指导如何进一步进行
val connectionString = ConnectionStringBuilder("ConnectionString")
.setEventHubName("EventHubname")
.build
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
.setConsumerGroup("azuredatabricks")
val eventhubs = spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
/*
val df = eventhubs.select(($"enqueuedTime").as("Enqueued_Time"),($"systemProperties.iothub-
connection-device-id") .as("Device_ID"),($"body".cast("string")).as("telemetry_json"))
df.createOrReplaceTempView("tel_table")
* /
上面的代码从eventhub读取传入的流数据,并在dataframe中加载数据(代码的注解部分)。现在我需要遍历每一行数据并将其传递给规则集。由于它是一个流数据,sparksql抛出错误。有没有人能指导一下如何按顺序读取这些流数据。
我寻求帮助的下一部分是如何从消息体推断模式。因为这些是传感器数据,所以模式不会是静态的。一条消息可以有设备ID、温度、ph值,另一条消息是设备、电压、电流。对于这样的情况,有一种方法可以在apachespark中动态推断模式。
非常感谢你的帮助。
暂无答案!
目前还没有任何答案,快来回答吧!