我有下列词典
结果=
[
{
"type:"check_datatype",
"kwargs":{
"table":"cars","column_name":"vin","d_type":"string"
}
},
{
"type":"check_emptystring",
"kwargs":{
"table":"cars","column_name":"vin"
}
},
{
"type:"check_null",
"kwargs":{
"table":"cars","columns":["vin","index"]
}
}
]
我想用下面的模式创建两个不同的pysparkDataframe-
当我们有唯一的(type,kwargs)对时,结果表中的args\u id列将是相同的。这个json必须每天运行,因此如果它再次找到相同的一对(type,kwargs),它应该给出相同的args\u id值。
到现在为止,我已经写了这个代码-
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
check_type_results = [[elt['type']] for elt in results]
checkColumns = ['type']
spark = SparkSession.builder.getOrCreate()
checkResultsDF = spark.createDataFrame(data=check_type_results, schema=checkColumns)
checkResultsDF = checkResultsDF.withColumn("time", F.current_timestamp())
checkResultsDF = checkResultsDF.withColumn("args_id", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
checkResultsDF.printSchema()
现在,在我的代码中,我总是以递增的顺序获取args\u id,这对于第一次运行是正确的,但是如果我在第二天再次运行json,或者可能在同一天,并且在json文件中出现了一对(type,kwargs),它已经出现在前面,所以我应该对该对使用相同的args\u id。
如果某对(type,kwargs)在arguments表中没有条目,则只有我将插入arguments表,但如果该对(type,kwargs)已存在于arguments表中,则不应在那里进行插入。
一旦这两个Dataframe被正确填充,那么我想将它们加载到单独的delta表中。
arguments表中的hashcode列是每个“kwargs”的唯一标识符。
1条答案
按热度按时间c0vxltue1#
问题
你的模式有点不完整。更详细的模式将允许您利用更多spark特性。使用以下解决方案
spark-sql
以及pyspark
. 与需要有序分区的窗口函数不同,您可以利用一些表生成数组函数,例如explode
以及posexplode
提供于spark-sql
. 因为它涉及到写入delta表,所以您可以在这里看到示例解决方案1:使用sparksql
设置
架构定义
示例记录是一个结构/对象数组,其中
kwargs
是一个Maptype
带可选键。注意。这个True
表示可选,当缺少键或不同格式的条目时应提供帮助可复制示例
结果
结果表生成
我用过
current_date
但是,您可以根据管道更改当前日期。结果
参数表生成
结果
解决方案2:使用自定义项
您还可以使用已经实现的python逻辑定义用户定义的函数,并将其应用于spark
设置
我们将在这里定义函数来创建结果和参数表。我选择创建生成器类型函数,但这是可选的。
Pypark设置
sparkDataframe
提取结果表
输出
提取结果表
输出
参考
spark sql函数
增量批写入