使用Spark Java从BigQuery阅读包含JSON字符串的列

kqlmhetl  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(165)

我在BigQuery中有一个表,我正在使用spark java从中阅读。但是,当我尝试访问一个列嵌套JSON值时,我无法使用from_json。
"sender“列具有以下结构:

{"UserInfo":{"CorporateEmailAddress":"email@gmail.com","UUID":32341983,"FirstName":"John","FirmNumber":678,"PersonalEmailAddress":"email@gmail.com","LastName":"Doe","AccountName":"AccountName","AccountNumber":12345}}

我创建了如下的嵌套结构体类型:

StructType userInfo = new StructType();
    userInfo.add("CorporateEmailAddress", DataTypes.StringType, false);
    userInfo.add("UUID", DataTypes.IntegerType, false);
    userInfo.add("FirstName", DataTypes.StringType, false);
    userInfo.add("FirmNumber", DataTypes.IntegerType, false);
    userInfo.add("PersonalEmailAddress", DataTypes.StringType, false);
    userInfo.add("LastName", DataTypes.StringType, false);
    userInfo.add("AccountName", DataTypes.StringType, false);
    userInfo.add("AccountNumber", DataTypes.IntegerType, false);

    StructType schema = new StructType();
    schema = schema.add("UserInfo", userInfo, false);

然后使用spark,使用withColumn和from_json,使用我的模式创建一个列,在该列中,我可以访问JSON字符串中的嵌套字段。

SparkSession spark = SparkSession.builder()
            .appName("spark-bigquery-pipeline")
            .getOrCreate();

    Dataset<Row> df = spark.read().format("bigquery")
            .option("table", "table-email-data").load();

   Dataset<Row> jsonColumnDataset = df.withColumn("jsonCol",functions.from_json(df.col("sender"), schema));
    jsonColumnDataset.printSchema();

    jsonColumnDataset.select("jsonCol").show();

然而,运行此代码时,我的输出是:

+-------+
|jsonCol|
+-------+
|   {{}}|
|   {{}}|
|   {{}}|
|   {{}}|
|   {{}}|
|   {{}}|
|   {{}}|
|   {{}}|
|   {{}}|
|   {{}}|

所以看起来我没有正确解析JSON并创建一个空列?
编辑:这里是jsonColumnDataset.printSchema()的输出;

root
 |-- message_ID: string (nullable = false)
 |-- msg_time: string (nullable = false)
 |-- msg_time_UTC: string (nullable = false)
 |-- msg_lang: string (nullable = false)
 |-- subject: string (nullable = false)
 |-- msg_body: string (nullable = false)
 |-- disc_ref: string (nullable = true)
 |-- greeting: string (nullable = true)
 |-- sender: string (nullable = false)
 |-- recipient: string (nullable = false)
 |-- attachment: string (nullable = true)
 |-- jsonCol: struct (nullable = true)
 |    |-- UserInfo: struct (nullable = true)

有人能看出我的方法有什么问题吗?

4c8rllxm

4c8rllxm1#

问题可能出在您创建的模式结构中。
您没有将所有json字段创建为StructField

已创建模式

第一个

打印数据

Dataset<Row> jsonColumnDataset = customerDF.withColumn("jsonCol", functions.from_json(customerDF.col("sender"), schema));

jsonColumnDataset.select("jsonCol").show(false);

输出

+---------------------------------------------------------------------------+
|jsonCol                                                                    |
+---------------------------------------------------------------------------+
|[[email@gmail.com,32341983,John,678,email@gmail.com,Doe,AccountName,12345]]|
+---------------------------------------------------------------------------+

打印新模式

jsonColumnDataset.printSchema();
root
 |-- sender: string (nullable = true)
 |-- jsonCol: struct (nullable = true)
 |    |-- UserInfo: struct (nullable = false)
 |    |    |-- CorporateEmailAddress: string (nullable = false)
 |    |    |-- UUID: integer (nullable = false)
 |    |    |-- FirstName: string (nullable = false)
 |    |    |-- FirmNumber: integer (nullable = false)
 |    |    |-- PersonalEmailAddress: string (nullable = false)
 |    |    |-- LastName: string (nullable = false)
 |    |    |-- AccountName: string (nullable = false)
 |    |    |-- AccountNumber: integer (nullable = false)

从新添加的json列访问各个字段

jsonColumnDataset.select("jsonCol.UserInfo.CorporateEmailAddress").show(false);
jsonColumnDataset.select("jsonCol.UserInfo.UUID").show(false);
输出
|CorporateEmailAddress|
+---------------------+
|email@gmail.com      |
+---------------------+

+--------+
|UUID    |
+--------+
|32341983|
+--------+

相关问题