我有一个用例,我从表中读取数据,并通过指定模式,使用from_json()
将一个字符串列解析为另一个字符串列:
from pyspark.sql.functions import from_json, col
spark = SparkSession.builder.appName("FromJsonExample").getOrCreate()
input_df = spark.sql("SELECT * FROM input_table")
json_schema = "struct<field1:string, field2:integer>"
output_df = input_df.withColumn("parsed_json", from_json(col("json_column"), json_schema))
output_df.show()
我正在尝试实现此列的错误处理。如果记录损坏,我想让该列为空,但我希望有另一个错误列,其中包含任何可以表明我的记录已损坏,以便我可以在以后过滤此记录。阅读文档后,有点不清楚此函数支持什么。文档中指出,您可以配置“选项”与json数据源相同(“控制解析的选项。接受与json数据源相同的选项”)但是直到尝试将“PERMISSIVE”模式与“columnNameOfCorruptRecord”一起使用之前,它不会在记录损坏的情况下生成新列。搜索结果并不多,但据我所知(尽管未经测试或确认)Databricks文档指定您可以使用此设置生成错误列(https://docs.databricks.com/sql/language-manual/functions/from_json.html)。
需要说明的是,我没有使用Databricks,但据我所知,该公司是由Apache Spark Foundation创建的,因此我的期望是使用/提供您可以在任何地方使用的相同工具。此外,我对使用“from_json”的特定用例感兴趣,而不是使用“read.json()”阅读数据并在那里配置选项,因为这在我的用例中是不可能的。
以下问题仍然存在:
1.如何处理“from_json”方法的错误?如果发生解析错误,是否有其他方法可以创建列?
1.由于“from_json()”的“选项”似乎不支持“DROPMALFORMED”配置,因此是否有方法删除格式错误的记录。之后无法通过空列进行检查,因为它可能在处理之前已经为空。
1条答案
按热度按时间9lowa7mx1#
示例 Dataframe (df_1):
导入必要的包:
1.使用
columnNameOfCorruptRecord
选项跟踪错误记录1.创建一个新列
corrupt_json
并删除parsed_json
中的corrupt_json字段1.使用
null
值更新parsed_json
中损坏的记录1.删除损坏的记录
输出