pyspark比较Dataframe中的记录并查找每列的增量

7tofc5zh  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(277)

我有两个Dataframe,fulldata和deltadata,它们需要比较,并且需要基于以下规则的输出
如果两个列中的列值相同,则它不应出现在输出–ex emp1中
当增量数据发生更改时,更改的列以及在强制字段查找中定义的强制列应包含在输出中–例如emp2,其中更改的列“field2”和强制列“雇员编号、记录类型、姓氏、名字”,将出生日期添加到输出中(字段1和字段3应为空,因为没有更改)
如果强制列发生更改,则应包括该记录–例如emp3,其中更改的列为姓氏和名字
应包括delta中的新记录–例如emp5
任何更改为空字符串的列都应硬编码为“(blank)”–例如emp6
我有一个工作代码来提供所需的输出,但性能很慢-大约需要10分钟的500条记录,250列。我在一个连接到glue dev端点的juypter笔记本中运行这段代码,配置如下。工人数量15工人类型g.2x数据处理单元(DPU)31这能以更有效的方式实现吗?感谢您的帮助。

from pyspark.sql.types import StructType, StringType, StructField
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from operator import add
from functools import reduce

spark = SparkContext.getOrCreate()
sql_context = SQLContext(spark)
spark_session = SparkSession.builder.enableHiveSupport().getOrCreate()

MANDATORY_FIELDS_LOOKUP = {
    "10PERDET": [
        "surname",
        "first_forename",
        "date_of_birth",
    ]
}

def delta_compare(full_df: DataFrame, delta_df: DataFrame, record_header:str, join_key: list) -> DataFrame:
    all_fields = full_df.columns
    mandatory_fields = MANDATORY_FIELDS_LOOKUP.get(record_header)
    delta_df = delta_df.withColumn("isDelta", lit(1))
    full_df = full_df.withColumn("isDelta", lit(0))
    union_df = full_df.union(delta_df)

    window_spec = Window.partitionBy([col(k) for k in join_key]).orderBy(col("isDelta"))

    df = (
        union_df.withColumn(
            "isChanged",
            reduce(
                add,
                [
                    when(col(fld) == lag(col(fld)).over(window_spec), 0).otherwise(1)
                    for fld in all_fields
                    if fld not in ["record_type", *join_key, "isDelta"]
                ],
            ),
        )
        .select(
            *join_key,
            col("record_type"),
            *mandatory_fields,
            *[
                when(col(fld) == lag(col(fld)).over(window_spec), "")
                .when(
                    (col(fld) != lag(col(fld)).over(window_spec))
                    & (col(fld) == ""),
                    "(blank)"
                )
                .otherwise(col(fld))
                .alias(fld)
                for fld in all_fields
                if fld not in ["record_type", *join_key, "isDelta"] + mandatory_fields
            ],
        )
        .filter(col("isDelta") == 1)
        .filter(col("isChanged") != 0)
    )
    return df.select(*[fld for fld in all_fields]).orderBy(*join_key)

schema = (StructType([StructField("employee_number", StringType())
                   ,StructField("record_type", StringType())
                   ,StructField("surname", StringType())
                   ,StructField("first_forename", StringType())
                   ,StructField("date_of_birth", StringType())                      
                   ,StructField("field1", StringType())
                   ,StructField("field2", StringType())
                   ,StructField("field3", StringType())])
         )
fulldata = [['emp1', "10PERDET", 'Neil', 'Par', '16011980', '10', '20', '30'],
            ['emp2', "10PERDET", 'Tom', 'Hanks', '11091982', '15', '25', '35'],
            ['emp3', "10PERDET", 'jag', 'ram', '26121981', '17', '27', '37'],
            ['emp4', "10PERDET", 'right', 'sam', '26121990', 'oldrow', '99', '88'],
            ['emp6', "10PERDET", 'coke', 'john', '01021985', '29', '39', '49'],
]
full_df = sql_context.createDataFrame(fulldata,schema=schema)

deltadata = [['emp1', "10PERDET", 'Neil', 'Par', '16011980', '10', '20', '30'],
             ['emp2', "10PERDET", 'Tom', 'Hanks', '11091982', '15', 'new', '35'],
             ['emp3', "10PERDET", 'jag_new', 'ram_new', '26121981', 'new', '27', '37'],
             ['emp5', "10PERDET", 'newjohn', 'gan', '22022020', 'newrow', '01', '02'],
             ['emp6', "10PERDET", 'coke', 'john', '01021985', '29', '', '49'],
]
delta_df = sql_context.createDataFrame(deltadata,schema=schema)
final_df = delta_compare(full_df, delta_df, "10PERDET", ['employee_number'])
final_df.show()

输出:

+---------------+-----------+-------+--------------+-------------+------+-------+------+
|employee_number|record_type|surname|first_forename|date_of_birth|field1| field2|field3|
+---------------+-----------+-------+--------------+-------------+------+-------+------+
|           emp2|   10PERDET|    Tom|         Hanks|     11091982|      |    new|      |
|           emp3|   10PERDET|jag_new|       ram_new|     26121981|   new|       |      |
|           emp5|   10PERDET|newjohn|           gan|     22022020|newrow|     01|    02|
|           emp6|   10PERDET|   coke|          john|     01021985|      |(blank)|      |
+---------------+-----------+-------+--------------+-------------+------+-------+------+

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题