我有一个结构化的流式处理作业,它从kafka队列中获取json消息并将它们存储到parquet/hdfs中。从那里,消息被按照时间表获取和分析,并存储到mysql数据库中。
现在我想实现流式传输,从kafka那里获取json消息,分析它并将其存储到mysql中。
我现在的问题是spark结构化流媒体不能让我使用pivot。
我现在的代码如下所示:
val df = readFromHdfsParquet
val df_exploded= df.select($"device", $"owner", $"timestamp", explode($"channels").as("exploded")
.withColumn("channelName", $"exploded.channelName")
.withColumn($"value", $"exploded.value")
.drop("exploded")
val df_grouped = df_exploded
.groupBy($"device, $"owner", $"timestamp")
.pivot("channelName")
.agg(first($"value", false)
这将导致所需的输出结构包含所有可用通道。
我的json如下所示:
{
"device": "TypeA",
"owner": "me",
"timestamp": "2019-05-12 17:27:59",
"channels": [
{
"channelName": "temperature",
"state": 0,
"value": "27"
},
{
"channelName": "humidity",
"state": 0,
"value": "10"
}
]
}
通道阵列的长度未设置,可以在不同设备之间更改。
我想要的是具有以下结构的Dataframe,并将其存储到mysql中。
|device|owner|timestamp |temperature|humidity|
|TypeA |me |2019-05-12 17:27:59|27 |10 |
结构化流媒体是如何做到这一点的?通过显式选择几个通道而不是所有通道也就足够了(f、 e.温度(不含湿度)
暂无答案!
目前还没有任何答案,快来回答吧!