Spark from_json -如何处理损坏的记录

euoag5mw  于 2023-03-19  发布在  Apache
关注(0)|答案(1)|浏览(123)

我有一个用例,我从表中读取数据,并通过指定模式,使用from_json()将一个字符串列解析为另一个字符串列:

from pyspark.sql.functions import from_json, col

spark = SparkSession.builder.appName("FromJsonExample").getOrCreate()

input_df = spark.sql("SELECT * FROM input_table")

json_schema = "struct<field1:string, field2:integer>"

output_df = input_df.withColumn("parsed_json", from_json(col("json_column"), json_schema))

output_df.show()

我正在尝试实现此列的错误处理。如果记录损坏,我想让该列为空,但我希望有另一个错误列,其中包含任何可以表明我的记录已损坏,以便我可以在以后过滤此记录。阅读文档后,有点不清楚此函数支持什么。文档中指出,您可以配置“选项”与json数据源相同(“控制解析的选项。接受与json数据源相同的选项”)但是直到尝试将“PERMISSIVE”模式与“columnNameOfCorruptRecord”一起使用之前,它不会在记录损坏的情况下生成新列。搜索结果并不多,但据我所知(尽管未经测试或确认)Databricks文档指定您可以使用此设置生成错误列(https://docs.databricks.com/sql/language-manual/functions/from_json.html)。
需要说明的是,我没有使用Databricks,但据我所知,该公司是由Apache Spark Foundation创建的,因此我的期望是使用/提供您可以在任何地方使用的相同工具。此外,我对使用“from_json”的特定用例感兴趣,而不是使用“read.json()”阅读数据并在那里配置选项,因为这在我的用例中是不可能的。
以下问题仍然存在:
1.如何处理“from_json”方法的错误?如果发生解析错误,是否有其他方法可以创建列?
1.由于“from_json()”的“选项”似乎不支持“DROPMALFORMED”配置,因此是否有方法删除格式错误的记录。之后无法通过空列进行检查,因为它可能在处理之前已经为空。

9lowa7mx

9lowa7mx1#

示例 Dataframe (df_1):

+---------------------------+
|json_column                |
+---------------------------+
|{'field1':'A', 'field2':1} |
|{'field1':'B', 'field2':2} |
|{'field1': C, 'field2':'3'}|
+---------------------------+

导入必要的包:

from pyspark.sql.functions import from_json, col, when

1.使用columnNameOfCorruptRecord选项跟踪错误记录

json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema = "struct<field1:string, field2:integer, corrupt_json:string>"

df_2 = input_df.withColumn(
    "parsed_json",
    from_json(
        col("json_column"),
        schema = json_schema,
        options = json_options
    )
)

1.创建一个新列corrupt_json并删除parsed_json中的corrupt_json字段

df_3 = df_2 \
    .withColumn("corrupt_json", col("parsed_json.corrupt_json")) \
    .withColumn("parsed_json", col("parsed_json").dropFields("corrupt_json"))

1.使用null值更新parsed_json中损坏的记录

df_4 = df_3.withColumn("parsed_json", when(col("corrupt_json").isNotNull(), None).otherwise(col("parsed_json")))

1.删除损坏的记录

df_5 = df_4.filter(col("parsed_json").isNotNull()).drop("parsed_json","corrupt_json")
df_5.show()

输出

+--------------------------+
|json_column               |
+--------------------------+
|{'field1':'A', 'field2':1}|
|{'field1':'B', 'field2':2}|
+--------------------------+

相关问题