在我的Spark应用程序中,我试图读取通过套接字发送的JSON数据,数据是字符串格式的,例如{"deviceId": "1", "temperature":4.5}
。
我创建了一个模式,如下所示:
StructType dataSchema = new StructType()
.add("deviceId", "string")
.add("temperature", "double");
我编写了下面的代码来提取字段,将它们转换为列,这样我就可以在SQL查询中使用它。
Dataset<Row> normalizedStream = stream.select(functions.from_json(new Column("value"),dataSchema)).as("json");
Dataset<Data> test = normalizedStream.select("json.*").as(Encoders.bean(Data.class));
test.printSchema();
数据类
public class Data {
private String deviceId;
private double temperature;
}
但是当我提交Spark应用程序时,输出模式如下所示。
root
|-- from_json(value): struct (nullable = true)
| |-- deviceId: string (nullable = true)
| |-- temperature: double (nullable = true)
from_json
函数作为列名出现。
我期待的是:
root
|-- deviceId: string (nullable = true)
|-- temperature: double (nullable = true)
如何解决以上问题?请让我知道我做错了什么。
1条答案
按热度按时间bxjv4tth1#
问题是
alias
的放置,现在,您放置了select
的别名,而不是from_json
的别名。现在,
json.*
不起作用,因为重命名没有按预期工作,因此找不到名为json
的列,也找不到它里面的任何子列。所以,如果你把括号从这里移开:
改为:
最终的数据和模式将如下所示:
这就是你打算做的。希望这对你有帮助,祝你好运!