从字典列表创建pysparkDataframe

m4pnthwp  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(374)

我有下列词典
结果=

[
    {
        "type:"check_datatype",
        "kwargs":{
            "table":"cars","column_name":"vin","d_type":"string"
            }
    },
    {
        "type":"check_emptystring",
        "kwargs":{
            "table":"cars","column_name":"vin"
            }
    },
    {
        "type:"check_null",
        "kwargs":{
            "table":"cars","columns":["vin","index"]
            }
    }
]

我想用下面的模式创建两个不同的pysparkDataframe-

当我们有唯一的(type,kwargs)对时,结果表中的args\u id列将是相同的。这个json必须每天运行,因此如果它再次找到相同的一对(type,kwargs),它应该给出相同的args\u id值。
到现在为止,我已经写了这个代码-

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
check_type_results = [[elt['type']] for elt in results]
checkColumns = ['type']
spark = SparkSession.builder.getOrCreate()
checkResultsDF = spark.createDataFrame(data=check_type_results, schema=checkColumns)
checkResultsDF = checkResultsDF.withColumn("time", F.current_timestamp())
checkResultsDF = checkResultsDF.withColumn("args_id", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
checkResultsDF.printSchema()

现在,在我的代码中,我总是以递增的顺序获取args\u id,这对于第一次运行是正确的,但是如果我在第二天再次运行json,或者可能在同一天,并且在json文件中出现了一对(type,kwargs),它已经出现在前面,所以我应该对该对使用相同的args\u id。
如果某对(type,kwargs)在arguments表中没有条目,则只有我将插入arguments表,但如果该对(type,kwargs)已存在于arguments表中,则不应在那里进行插入。
一旦这两个Dataframe被正确填充,那么我想将它们加载到单独的delta表中。
arguments表中的hashcode列是每个“kwargs”的唯一标识符。

c0vxltue

c0vxltue1#

问题

你的模式有点不完整。更详细的模式将允许您利用更多spark特性。使用以下解决方案 spark-sql 以及 pyspark . 与需要有序分区的窗口函数不同,您可以利用一些表生成数组函数,例如 explode 以及 posexplode 提供于 spark-sql . 因为它涉及到写入delta表,所以您可以在这里看到示例

解决方案1:使用sparksql

设置

from pyspark.sql.types import ArrayType,StructType, StructField, StringType, MapType
from pyspark.sql import Row, SparkSession

sparkSession = SparkSession.builder.appName("Demo").getOrCreate()

架构定义

示例记录是一个结构/对象数组,其中 kwargs 是一个 Maptype 带可选键。注意。这个 True 表示可选,当缺少键或不同格式的条目时应提供帮助

schema = StructType([
    StructField("entry",ArrayType(
        StructType([
            StructField("type",StringType(),True),
            StructField("kwargs",MapType(StringType(),StringType()),True)
        ])
    ),True)
])

可复制示例

result_entry =[
    {
        "type":"check_datatype",
        "kwargs":{
            "table":"cars","column_name":"vin","d_type":"string"
            }
    },
    {
        "type":"check_emptystring",
        "kwargs":{
            "table":"cars","column_name":"vin"
            }
    },
    {
        "type":"check_null",
        "kwargs":{
            "table":"cars","columns":["vin","index"]
            }
    }
]

df_results = sparkSession.createDataFrame([Row(entry=result_entry)],schema=schema)
df_results.createOrReplaceTempView("df_results")
df_results.show()

结果

+--------------------+
|               entry|
+--------------------+
|[{check_datatype,...|
+--------------------+

结果表生成

我用过 current_date 但是,您可以根据管道更改当前日期。

results_table = sparkSession.sql("""
WITH raw_results as (
    SELECT 
        posexplode(entry),
        current_date as time
    FROM
        df_results
)
SELECT
    col.type as Type,
    time,
    pos as arg_id
FROM
    raw_results
""")

results_table.show()

结果

+-----------------+----------+------+
|             Type|      time|arg_id|
+-----------------+----------+------+
|   check_datatype|2021-03-31|     0|
|check_emptystring|2021-03-31|     1|
|       check_null|2021-03-31|     2|
+-----------------+----------+------+

参数表生成

args_table = sparkSession.sql("""
WITH raw_results as (
    SELECT 
        posexplode(entry)
    FROM
        df_results
),
raw_arguments AS (
    SELECT
        explode(col.kwargs),
        pos as args_id
    FROM
        raw_results
),
raw_arguments_before_array_check AS (
SELECT
    args_id,
    key as bac_key,
    value as bac_value

FROM
    raw_arguments
),
raw_arguments_after_array_check AS (
SELECT
   args_id,
   bac_key,
   bac_value,
   posexplode(split(regexp_replace(bac_value,"[\\\[\\\]]",""),","))
FROM
   raw_arguments_before_array_check
)
SELECT
    args_id,
    bac_key as key,
    col as value,
    CASE
        WHEN bac_value LIKE '[%' THEN pos
        ELSE NULL
    END as list_index,
    abs(hash(args_id, bac_key,col,pos)) as hashcode
FROM
    raw_arguments_after_array_check
""")

args_table.show()

结果

+-------+-----------+------+----------+----------+
|args_id|        key| value|list_index|  hashcode|
+-------+-----------+------+----------+----------+
|      0|     d_type|string|      null| 216841494|
|      0|column_name|   vin|      null| 502458545|
|      0|      table|  cars|      null|1469121505|
|      1|column_name|   vin|      null| 604007568|
|      1|      table|  cars|      null| 784654488|
|      2|    columns|   vin|         0|1503105124|
|      2|    columns| index|         1| 454389776|
|      2|      table|  cars|      null| 858757332|
+-------+-----------+------+----------+----------+

解决方案2:使用自定义项

您还可以使用已经实现的python逻辑定义用户定义的函数,并将其应用于spark

设置

我们将在这里定义函数来创建结果和参数表。我选择创建生成器类型函数,但这是可选的。

result_entry =[
    {
        "type":"check_datatype",
        "kwargs":{
            "table":"cars","column_name":"vin","d_type":"string"
            }
    },
    {
        "type":"check_emptystring",
        "kwargs":{
            "table":"cars","column_name":"vin"
            }
    },
    {
        "type":"check_null",
        "kwargs":{
            "table":"cars","columns":["vin","index"]
            }
    }
]

import json
result_entry_str = json.dumps(result_entry)
result_entry_str

def extract_results_table(entry,current_date=None):
    if current_date is None:
        from datetime import date
        current_date = str(date.today())
    if type(entry)==str:
        import json
        entry = json.loads(entry)

    for arg_id,arg in enumerate(entry):
        yield {
            "Type":arg["type"],
            "time":current_date,
            "args_id":arg_id
        }

def extract_arguments_table(entry):
    if type(entry)==str:
        import json
        entry = json.loads(entry)

    for arg_id,arg in enumerate(entry):
        if "kwargs" in arg:
            for arg_entry in arg["kwargs"]:
                orig_key,orig_value = arg_entry, arg["kwargs"][arg_entry]
                if type(orig_value)==list:
                    for list_index,value in enumerate(orig_value):
                        yield {
                            "args_id":arg_id,
                            "key":orig_key,
                            "value":value,
                            "list_index":list_index,
                            "hash_code": hash((arg_id,orig_key,value,list_index))
                        }
                else:
                    yield {
                            "args_id":arg_id,
                            "key":orig_key,
                            "value":orig_value,
                            "list_index":None,
                            "hash_code": hash((arg_id,orig_key,orig_value,"null"))
                        }

Pypark设置

from pyspark.sql.functions import udf,col,explode
from pyspark.sql.types import StructType,StructField,IntegerType,StringType, ArrayType

results_table_schema = ArrayType(StructType([
    StructField("Type",StringType(),True),
    StructField("time",StringType(),True),
    StructField("args_id",IntegerType(),True)
]),True)

arguments_table_schema = ArrayType(StructType([
    StructField("args_id",IntegerType(),True),
    StructField("key",StringType(),True),
    StructField("value",StringType(),True),
    StructField("list_index",IntegerType(),True),
    StructField("hash",StringType(),True)
]),True)

extract_results_table_udf = udf(lambda entry,current_date=None : [*extract_results_table(entry,current_date)],results_table_schema)
extract_arguments_table_udf = udf(lambda entry: [*extract_arguments_table(entry)],arguments_table_schema)

# this is useful if you intend to use your functions in spark-sql

sparkSession.udf.register('extract_results_table',extract_results_table_udf)
sparkSession.udf.register('extract_arguments_table',extract_arguments_table_udf)

sparkDataframe

df_results_1 = sparkSession.createDataFrame([Row(entry=result_entry_str)],schema="entry string")
df_results_1.createOrReplaceTempView("df_results_1")
df_results_1.show()

提取结果表


# Using Spark SQL

sparkSession.sql("""
WITH results_table AS (
    select explode(extract_results_table(entry)) as entry FROM df_results_1
)
SELECT entry.* from results_table
""").show()

# Just python

df_results_1.select(
   explode(extract_results_table_udf(df_results_1.entry)).alias("entry")
).selectExpr("entry.*").show()

输出

+-----------------+----------+-------+
|             Type|      time|args_id|
+-----------------+----------+-------+
|   check_datatype|2021-03-31|      0|
|check_emptystring|2021-03-31|      1|
|       check_null|2021-03-31|      2|
+-----------------+----------+-------+

+-----------------+----------+-------+
|             Type|      time|args_id|
+-----------------+----------+-------+
|   check_datatype|2021-03-31|      0|
|check_emptystring|2021-03-31|      1|
|       check_null|2021-03-31|      2|
+-----------------+----------+-------+

提取结果表


# Using spark sql

sparkSession.sql("""
WITH arguments_table AS (
    select explode(extract_arguments_table(entry)) as entry FROM df_results_1
)
SELECT entry.* from arguments_table
""").show()

# Just python

df_results_1.select(
   explode(extract_arguments_table_udf(df_results_1.entry)).alias("entry")
).selectExpr("entry.*").show()

输出

+-------+-----------+------+----------+----+
|args_id|        key| value|list_index|hash|
+-------+-----------+------+----------+----+
|      0|      table|  cars|      null|null|
|      0|column_name|   vin|      null|null|
|      0|     d_type|string|      null|null|
|      1|      table|  cars|      null|null|
|      1|column_name|   vin|      null|null|
|      2|      table|  cars|      null|null|
|      2|    columns|   vin|         0|null|
|      2|    columns| index|         1|null|
+-------+-----------+------+----------+----+

+-------+-----------+------+----------+----+
|args_id|        key| value|list_index|hash|
+-------+-----------+------+----------+----+
|      0|      table|  cars|      null|null|
|      0|column_name|   vin|      null|null|
|      0|     d_type|string|      null|null|
|      1|      table|  cars|      null|null|
|      1|column_name|   vin|      null|null|
|      2|      table|  cars|      null|null|
|      2|    columns|   vin|         0|null|
|      2|    columns| index|         1|null|
+-------+-----------+------+----------+----+

参考

spark sql函数
增量批写入

相关问题