PySpark:如何处理字段的不一致JSON数据类型

kq0g1dla  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(129)

我有一个很大的JSON字符串存储在delta表的一个列中。我使用结构化流进行读取和写入。我有一个讨厌的嵌套字段'value',它有两种类型:string,number,有时是null。因为它有时是字符串,有时是数字,我无法找到一种方法来处理和解析它,而不会使其中一个成为null值。示例:
| json_col|
| --|
| 第一个月|
| {"value": 456}个|
| {"value": null}个|
如果我将schema指定为:

schema = StructType([StructField('value', StringType())])

字符串
字符串会被解析,但数字不会被转换,它们只是变成了更多的null值。在上面的场景中,第二行的值只是null。如果我将schema指定为LongType,则会发生相反的情况;原始数据中的字符串变成null。有没有一种方法可以有效地处理这个问题,而不需要逐行修改负载?我检查了json流阅读器的文档,寻找一个函数,但没有看到任何预构建的方法来处理这个问题。

xxslljrj

xxslljrj1#

在pyspark版本3.5.0中,当我在批处理和流处理版本中指定StringType()时,我没有发现数字被空。

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import *
import sys
from pyspark.sql.functions import when, max, greatest

sc = SparkContext('local')
sqlContext = SQLContext(sc)

inputFile = "../data/unionDataTypeDir"
schema_provided = StructType([StructField("value", StringType(), nullable=True)])

initialDF = sqlContext.read.schema(schema_provided).json(inputFile)

print("pyspark version :", sc.version)

print("Normal batch dataframe")
initialDF.show(n=100, truncate=False)

streaming_df = sqlContext.readStream \
    .schema(schema_provided) \
    .json(inputFile)

print("Streaming dataframe")
query = streaming_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

字符串
输出量:

pyspark version : 3.5.0
Normal batch dataframe
+-----+
|value|
+-----+
|123  |
|456  |
|NULL |
+-----+

Streaming dataframe
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+
|value|
+-----+
|  123|
|  456|
| NULL|
+-----+

相关问题