我使用pyspark在Dataframe中转换json。我成功地改造了它。但我面临的问题是,有一个键将出现在某个json文件中,而不会出现在另一个json文件中。当我用pysparksql上下文展平json,并且某个json文件中不存在键时,它会在创建pysparkDataframe时出错,引发sql分析异常。
例如,我的示例json
{
"_id" : ObjectId("5eba227a0bce34b401e7899a"),
"origin" : "inbound",
"converse" : "72412952",
"Start" : "2020-04-20T06:12:20.89Z",
"End" : "2020-04-20T06:12:53.919Z",
"ConversationMos" : 4.88228940963745,
"ConversationRFactor" : 92.4383773803711,
"participantId" : "bbe4de4c-7b3e-49f1-8",
}
上述json参与者id将在某些json文件中可用,而在另一个json文件中不可用
my pysaprk代码段:
fetchFile = sark.read.format(file_type)\
.option("inferSchema", "true")\
.option("header","true")\
.load(generated_FileLocation)
fetch file.registerTempTable("CreateDataFrame")
tempData = sqlContext.sql("select origin,converse,start,end,participantId from CreateDataFrame")
当,在某个json文件中 participantId
如果不存在,就会出现异常。如何处理这样一种异常:如果键不存在,那么列将包含null或任何其他方法来处理它
2条答案
按热度按时间gmxoilav1#
我认为您调用spark是为了一次读取一个文件,同时推断模式。
spark在sql分析异常中告诉您的是,您的文件和推断的模式没有您要查找的密钥。你要做的就是找到好的模式,并将它应用到你想要处理的所有文件中。理想情况下,一次处理所有文件。
有三种策略:
从大量文件中推断出你的模式。你应该得到所有钥匙的总和。spark将对数据进行两次遍历。
创建一个schema对象我觉得这样做很乏味,但是会加快代码的速度。以下是参考资料:https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.types.structtype
从格式良好的文件中读取模式,然后使用它来读取整个目录。另外,通过打印schema对象,可以将其复制粘贴回选项2的代码中。
rqmkfv5c2#
您可以简单地检查列是否不存在,然后添加它将清空值。相同的代码如下: