如何从pysparksql上下文查询数据如果json fie中不存在键,如何捕获并给出sql分析执行选项

doinxwow  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(277)

我使用pyspark在Dataframe中转换json。我成功地改造了它。但我面临的问题是,有一个键将出现在某个json文件中,而不会出现在另一个json文件中。当我用pysparksql上下文展平json,并且某个json文件中不存在键时,它会在创建pysparkDataframe时出错,引发sql分析异常。
例如,我的示例json

{
    "_id" : ObjectId("5eba227a0bce34b401e7899a"),
    "origin" : "inbound",
    "converse" : "72412952",
    "Start" : "2020-04-20T06:12:20.89Z",
    "End" : "2020-04-20T06:12:53.919Z",
    "ConversationMos" : 4.88228940963745,
    "ConversationRFactor" : 92.4383773803711,
    "participantId" : "bbe4de4c-7b3e-49f1-8",
}

上述json参与者id将在某些json文件中可用,而在另一个json文件中不可用
my pysaprk代码段:

fetchFile = sark.read.format(file_type)\
                .option("inferSchema", "true")\
                .option("header","true")\
                .load(generated_FileLocation)

fetch file.registerTempTable("CreateDataFrame")
tempData = sqlContext.sql("select origin,converse,start,end,participantId from CreateDataFrame")

当,在某个json文件中 participantId 如果不存在,就会出现异常。如何处理这样一种异常:如果键不存在,那么列将包含null或任何其他方法来处理它

gmxoilav

gmxoilav1#

我认为您调用spark是为了一次读取一个文件,同时推断模式。
spark在sql分析异常中告诉您的是,您的文件和推断的模式没有您要查找的密钥。你要做的就是找到好的模式,并将它应用到你想要处理的所有文件中。理想情况下,一次处理所有文件。
有三种策略:
从大量文件中推断出你的模式。你应该得到所有钥匙的总和。spark将对数据进行两次遍历。

df = spark.read.json('/path/to/your/directory/full/of/json/files')
schema = df.schema
print(schema)

创建一个schema对象我觉得这样做很乏味,但是会加快代码的速度。以下是参考资料:https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.types.structtype
从格式良好的文件中读取模式,然后使用它来读取整个目录。另外,通过打印schema对象,可以将其复制粘贴回选项2的代码中。

schema = spark.read.json('path/to/well/formed/file.json')
print(schema)
my_df = spark.read.schema(schema).json('path/to/entire/folder/full/of/json')
rqmkfv5c

rqmkfv5c2#

您可以简单地检查列是否不存在,然后添加它将清空值。相同的代码如下:

from pyspark.sql import functions as f
fetchFile = sark.read.format(file_type)\
                .option("inferSchema", "true")\
                .option("header","true")\
                .load(generated_FileLocation)

if not 'participantId' in df.columns:
   df = df.withColumn('participantId', f.lit(''))

fetch file.registerTempTable("CreateDataFrame")
tempData = sqlContext.sql("select origin,converse,start,end,participantId from CreateDataFrame")

相关问题