我有两个Dataframe,fulldata和deltadata,它们需要比较,并且需要基于以下规则的输出
如果两个列中的列值相同,则它不应出现在输出–ex emp1中
当增量数据发生更改时,更改的列以及在强制字段查找中定义的强制列应包含在输出中–例如emp2,其中更改的列“field2”和强制列“雇员编号、记录类型、姓氏、名字”,将出生日期添加到输出中(字段1和字段3应为空,因为没有更改)
如果强制列发生更改,则应包括该记录–例如emp3,其中更改的列为姓氏和名字
应包括delta中的新记录–例如emp5
任何更改为空字符串的列都应硬编码为“(blank)”–例如emp6
我有一个工作代码来提供所需的输出,但性能很慢-大约需要10分钟的500条记录,250列。我在一个连接到glue dev端点的juypter笔记本中运行这段代码,配置如下。工人数量15工人类型g.2x数据处理单元(DPU)31这能以更有效的方式实现吗?感谢您的帮助。
from pyspark.sql.types import StructType, StringType, StructField
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from operator import add
from functools import reduce
spark = SparkContext.getOrCreate()
sql_context = SQLContext(spark)
spark_session = SparkSession.builder.enableHiveSupport().getOrCreate()
MANDATORY_FIELDS_LOOKUP = {
"10PERDET": [
"surname",
"first_forename",
"date_of_birth",
]
}
def delta_compare(full_df: DataFrame, delta_df: DataFrame, record_header:str, join_key: list) -> DataFrame:
all_fields = full_df.columns
mandatory_fields = MANDATORY_FIELDS_LOOKUP.get(record_header)
delta_df = delta_df.withColumn("isDelta", lit(1))
full_df = full_df.withColumn("isDelta", lit(0))
union_df = full_df.union(delta_df)
window_spec = Window.partitionBy([col(k) for k in join_key]).orderBy(col("isDelta"))
df = (
union_df.withColumn(
"isChanged",
reduce(
add,
[
when(col(fld) == lag(col(fld)).over(window_spec), 0).otherwise(1)
for fld in all_fields
if fld not in ["record_type", *join_key, "isDelta"]
],
),
)
.select(
*join_key,
col("record_type"),
*mandatory_fields,
*[
when(col(fld) == lag(col(fld)).over(window_spec), "")
.when(
(col(fld) != lag(col(fld)).over(window_spec))
& (col(fld) == ""),
"(blank)"
)
.otherwise(col(fld))
.alias(fld)
for fld in all_fields
if fld not in ["record_type", *join_key, "isDelta"] + mandatory_fields
],
)
.filter(col("isDelta") == 1)
.filter(col("isChanged") != 0)
)
return df.select(*[fld for fld in all_fields]).orderBy(*join_key)
schema = (StructType([StructField("employee_number", StringType())
,StructField("record_type", StringType())
,StructField("surname", StringType())
,StructField("first_forename", StringType())
,StructField("date_of_birth", StringType())
,StructField("field1", StringType())
,StructField("field2", StringType())
,StructField("field3", StringType())])
)
fulldata = [['emp1', "10PERDET", 'Neil', 'Par', '16011980', '10', '20', '30'],
['emp2', "10PERDET", 'Tom', 'Hanks', '11091982', '15', '25', '35'],
['emp3', "10PERDET", 'jag', 'ram', '26121981', '17', '27', '37'],
['emp4', "10PERDET", 'right', 'sam', '26121990', 'oldrow', '99', '88'],
['emp6', "10PERDET", 'coke', 'john', '01021985', '29', '39', '49'],
]
full_df = sql_context.createDataFrame(fulldata,schema=schema)
deltadata = [['emp1', "10PERDET", 'Neil', 'Par', '16011980', '10', '20', '30'],
['emp2', "10PERDET", 'Tom', 'Hanks', '11091982', '15', 'new', '35'],
['emp3', "10PERDET", 'jag_new', 'ram_new', '26121981', 'new', '27', '37'],
['emp5', "10PERDET", 'newjohn', 'gan', '22022020', 'newrow', '01', '02'],
['emp6', "10PERDET", 'coke', 'john', '01021985', '29', '', '49'],
]
delta_df = sql_context.createDataFrame(deltadata,schema=schema)
final_df = delta_compare(full_df, delta_df, "10PERDET", ['employee_number'])
final_df.show()
输出:
+---------------+-----------+-------+--------------+-------------+------+-------+------+
|employee_number|record_type|surname|first_forename|date_of_birth|field1| field2|field3|
+---------------+-----------+-------+--------------+-------------+------+-------+------+
| emp2| 10PERDET| Tom| Hanks| 11091982| | new| |
| emp3| 10PERDET|jag_new| ram_new| 26121981| new| | |
| emp5| 10PERDET|newjohn| gan| 22022020|newrow| 01| 02|
| emp6| 10PERDET| coke| john| 01021985| |(blank)| |
+---------------+-----------+-------+--------------+-------------+------+-------+------+
暂无答案!
目前还没有任何答案,快来回答吧!