我正在将一堆应用程序转换为不同的技术。作为其中的一部分,需要比较旧数据和新数据。这篇文章的想法是要有一个通用的框架,可以帮助比较任何两个输入。我还为每一列正在更改的内容提供了一个标志。
比较时有两种比较。一个有钥匙,另一个没有钥匙。
我需要这个通用框架来处理最大35亿行的任何数量的记录。可能的最大gb数据为400gb—数据范围从1到400gb。
另一个常见的情况是每个数据中都有一个loc或itm列,并且数据均匀地分布在这些列上,但是存储在hdfs中的数据不是这样分区的。下面的方法适用于中等容量的记录,但在比较80gb的数据时失败。
首先读取源和目标数据集
if (SourceConType=='hdfs' and SourceFileFormat.lower() == 'parquet'):
df_sourceFile=spark.read.parquet(SourceFile)
elif SourceConType == 'hive':
df_sourceFile=spark.sql(source_sql)
elif SourceConType=='hdfs':
if SourceFileFormat.lower()== 'parquet':
df_sourceFile = spark.read.parquet(SourceFile)
elif SourceFileFormat.lower()== 'orc':
df_sourceFile = spark.read.orc(SourceFile)
elif SourceFileFormat.lower() in ( 'csv' , 'text'):
df_sourceFile = spark.read.csv(SourceFile,header=True,sep=SourceFile_Sep,nullValue="")
else:
print("Only ORC,Parquet, csv and text are suported in hdfs")
else:
df_sourceFile = spark.read.csv(SourceFile, header=True, sep=SourceFile_Sep, nullValue="")
if (targetConType=='hdfs' and TargetFileFormat.lower() == 'parquet'):
df_TgtFile=spark.read.parquet(TargetFile)
elif targetConType == 'hive':
df_TgtFile=spark.sql(target_sql)
elif targetConType=='hdfs':
if TargetFileFormat.lower()== 'parquet':
df_TgtFile = spark.read.parquet(TargetFile)
elif TargetFileFormat.lower()== 'orc':
df_TgtFile = spark.read.orc(TargetFile)
elif TargetFileFormat.lower() in ( 'csv' , 'text'):
df_TgtFile = spark.read.csv(TargetFile,header=True,sep=targetFile_Sep,nullValue="")
else:
print("Only ORC,Parquet, csv and text are suported in hdfs")
else:
df_TgtFile = spark.read.csv(SourceFile, header=True, sep=targetFile_Sep, nullValue="")
键列列表-基于用户提供的键列表-使用键列表创建联接条件-对于lit key\u column\u list=['loc','itm','ven'],它创建并联接'b.loc为null或b.itm为null或b.ven为null'
join_condition = ''
key_column_list_join = []
b_left_join_condition =''
b_left_cond =''
for keycolindex in range(len(key_column_list)):
join_condition = join_condition + "COALESCE (a." + key_column_list[keycolindex].strip() + ",'') = COALESCE (b." + \
key_column_list[keycolindex].strip() + ",'') and "
b_left_join_condition = b_left_join_condition + "b." + key_column_list[keycolindex].strip() + " is NULL "
b_left_cond = "b." + key_column_list[keycolindex].strip() + " is NULL "
key_column_list_join.append(b_left_cond)
key_column_list_join = ' OR '.join(key_column_list_join)
下面将创建包含case和coalease语句的所有列
d_compare_columns=df_compare_src_df.dtypes
for compare_columns_index in range(len(d_compare_columns)):
if d_compare_columns[compare_columns_index][0] not in key_column_list and \
d_compare_columns[compare_columns_index][1] == 'string':
flag_set_sql = flag_set_sql + ', case WHEN COALESCE (A.' + d_compare_columns[compare_columns_index][
0] + ',"")=COALESCE (B.' + \
d_compare_columns[compare_columns_index][0] + ',"") THEN "Y" ELSE "N" END AS ' + \
d_compare_columns[compare_columns_index][0] + "_flag"
flag_set1_sql = flag_set1_sql + ', "" as ' + d_compare_columns[compare_columns_index][0] + "_flag"
if d_compare_columns[compare_columns_index][0] not in key_column_list and \
d_compare_columns[compare_columns_index][1].startswith(tuple(date_list)):
flag_set_sql = flag_set_sql + ', case WHEN COALESCE (A.' + d_compare_columns[compare_columns_index][
0] + ',"1800-01-01")=COALESCE (B.' + \
d_compare_columns[compare_columns_index][
0] + ',"1800-01-01") THEN "Y" ELSE "N" END AS ' + \
d_compare_columns[compare_columns_index][0] + "_flag"
flag_set1_sql = flag_set1_sql + ', "" as ' + d_compare_columns[compare_columns_index][0] + "_flag"
if d_compare_columns[compare_columns_index][0] not in key_column_list and \
d_compare_columns[compare_columns_index][1].startswith(tuple(dec_list)):
flag_set_sql = flag_set_sql + ', case WHEN COALESCE (A.' + d_compare_columns[compare_columns_index][
0] + ',"-99999999")=COALESCE (B.' + \
d_compare_columns[compare_columns_index][
0] + ',"-99999999") THEN "Y" ELSE "N" END AS ' + \
d_compare_columns[compare_columns_index][0] + "_flag"
flag_set1_sql = flag_set1_sql + ', "" as ' + d_compare_columns[compare_columns_index][0] + "_flag"
然后创建一个包含所有列的哈希
df_compare_tgt_df = df_compare_tgt_df.withColumn("all_hash_val", sha2(concat_ws("||", *df_compare_tgt_df.columns), 256))
df_compare_src_df = df_compare_src_df.withColumn("all_hash_val",
sha2(concat_ws("||", *df_compare_src_df.columns), 256))
根据用户提供的键与否-对键列做左外联接和内联接所有列哈希值,以标识插入、删除、更新和无更改。如果有任何更改,则为更改的列标记y。
df_compare_tgt_df.createOrReplaceTempView("NewTable")
df_compare_src_df.createOrReplaceTempView("OldTable")
if len(key_column_list[0]) > 0:
print("Preparing Compare Sqls")
print(len(key_column_list))
new_rec_sql='select "TGT" AS SRC, A.* ,"I" as CHG_IND ' + flag_set1_sql + ' from NewTable A left outer join OldTable B on ' + join_condition + ' WHERE ' + key_column_list_join
del_rec_sql='select "SRC" AS SRC, A.*,"D" as CHG_IND ' + flag_set1_sql + ' from OldTable A left outer join NewTable B on ' + join_condition + ' WHERE ' + key_column_list_join
matched_rec_sql='select "SRC" AS SRC , "NC" as CHG_IND, A.* ' + flag_set1_sql + ' from NewTable A inner join OldTable B on ' + join_condition + ' where A.all_hash_val=B.all_hash_val'
sql_a = 'select "TGT" AS SRC , "C" as CHG_IND, A.* ' + flag_set_sql + ' from NewTable A inner join OldTable B on ' + join_condition + ' where A.all_hash_val!=B.all_hash_val'
sql_b = 'select "SRC" AS SRC , "C" as CHG_IND, B.* ' + flag_set_sql + ' from NewTable A inner join OldTable B on ' + join_condition + ' where A.all_hash_val!=B.all_hash_val'
mis_matched_rec_sql='select * from ( ' + sql_a + ' union ' + sql_b + ' ) A order by ' + Keys + ', SRC'
else:
new_rec_sql='select "TGT" AS SRC, A.* ,"I" as CHG_IND ' + flag_set1_sql + ' from NewTable A left outer join OldTable B on A.all_hash_val=B.all_hash_val WHERE B.all_hash_val is NULL'
del_rec_sql='select "SRC" AS SRC, A.*,"D" as CHG_IND ' + flag_set1_sql + ' from OldTable A left outer join NewTable B on A.all_hash_val=B.all_hash_val WHERE B.all_hash_val is NULL'
matched_rec_sql='select A.* ,"NC" as CHG_IND from NewTable A inner join OldTable B on A.all_hash_val=B.all_hash_val'
df_new_records = spark.sql(new_rec_sql)
df_del_records = spark.sql(del_rec_sql)
df_matched_records = spark.sql(matched_rec_sql)
基本上它执行下面的sql
NEW
select "TGT" AS SRC, A.* ,"I" as CHG_IND , "" as period2_flag, "" as period3_flag, "" as model_flag, "" as period1_flag, "" as keycol_flag from NewTable A left outer join OldTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and 1=1 WHERE b.dmdunit is NULL OR b.dmdgroup is NULL OR b.loc is NULL OR b.startdate is NULL OR b.type is NULL OR b.fcstid is NULL
DELETES
select "SRC" AS SRC, A.*,"D" as CHG_IND , "" as period2_flag, "" as period3_flag, "" as model_flag, "" as period1_flag, "" as keycol_flag from OldTable A left outer join NewTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and 1=1 WHERE b.dmdunit is NULL OR b.dmdgroup is NULL OR b.loc is NULL OR b.startdate is NULL OR b.type is NULL OR b.fcstid is NULL
NOCHNAGES
select "SRC" AS SRC , "NC" as CHG_IND, A.* , "" as period2_flag, "" as period3_flag, "" as model_flag, "" as period1_flag, "" as keycol_flag from NewTable A inner join OldTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and 1=1 where A.all_hash_val=B.all_hash_val
MISMATCHES
select * from ( select "TGT" AS SRC , "C" as CHG_IND, A.* , case WHEN COALESCE (A.period2,"-99999999")=COALESCE (B.period2,"-99999999") THEN "Y" ELSE "N" END AS period2_flag, case WHEN COALESCE (A.period3,"-99999999")=COALESCE (B.period3,"-99999999") THEN "Y" ELSE "N" END AS period3_flag, case WHEN COALESCE (A.model,"")=COALESCE (B.model,"") THEN "Y" ELSE "N" END AS model_flag, case WHEN COALESCE (A.period1,"-99999999")=COALESCE (B.period1,"-99999999") THEN "Y" ELSE "N" END AS period1_flag, case WHEN COALESCE (A.keycol,"-99999999")=COALESCE (B.keycol,"-99999999") THEN "Y" ELSE "N" END AS keycol_flag from NewTable A inner join OldTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and 1=1 where A.all_hash_val!=B.all_hash_val union select "SRC" AS SRC , "C" as CHG_IND, B.* , case WHEN COALESCE (A.period2,"-99999999")=COALESCE (B.period2,"-99999999") THEN "Y" ELSE "N" END AS period2_flag, case WHEN COALESCE (A.period3,"-99999999")=COALESCE (B.period3,"-99999999") THEN "Y" ELSE "N" END AS period3_flag, case WHEN COALESCE (A.model,"")=COALESCE (B.model,"") THEN "Y" ELSE "N" END AS model_flag, case WHEN COALESCE (A.period1,"-99999999")=COALESCE (B.period1,"-99999999") THEN "Y" ELSE "N" END AS period1_flag, case WHEN COALESCE (A.keycol,"-99999999")=COALESCE (B.keycol,"-99999999") THEN "Y" ELSE "N" END AS keycol_flag from NewTable A inner join OldTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and 1=1 where A.all_hash_val!=B.all_hash_val ) A order by DMDUNIT, DMDGROUP, LOC, STARTDATE,TYPE,FCSTID, SRC
现在我面临执行的性能或ooo错误-所以我想重新分配数据分区,使其更好地运行,而不会失败
根据源行数决定重新分区,因为源数据在任何意义上都没有分区。
if SourceCount >=0 and SourceCount <= 1000000:
v_partition=200
spark.conf.set("spark.sql.shuffle.partitions", 200)
elif SourceCount >= 1000001 and SourceCount <= 10000000:
v_partition = 400
spark.conf.set("spark.sql.shuffle.partitions", 400)
elif SourceCount >= 10000001 and SourceCount <= 100000000 :
v_partition = 600
spark.conf.set("spark.sql.shuffle.partitions", 600)
elif SourceCount >= 100000001 and SourceCount <= 1000000000 :
v_partition = 1200
spark.conf.set("spark.sql.shuffle.partitions", 1200)
elif SourceCount >= 1000000000:
v_partition = 2400
spark.conf.set("spark.sql.shuffle.partitions", 2400)
else:
v_partition = 200
if len(key_column_list):
print('Repartition the data based on the keys with partition count as - ' + str(v_partition))
df_compare_tgt_df=()
df_compare_tgt_df=df_compare_tgt_df.repartition(v_partition,[x.lower() for x in key_column_list])
df_compare_src_df=df_compare_src_df.repartition(v_partition,[x.lower() for x in key_column_list])
else:
print('Repartition the data based all hash value with partition count as - ' + str(v_partition))
df_compare_tgt_df = df_compare_tgt_df.repartition(v_partition, "all_hash_val")
df_compare_src_df = df_compare_src_df.repartition(v_partition, "all_hash_val")
我有3个工作节点
Memory VCORE
186 GB 25
186 GB 25
183 GB 25
我试着运行2个80gb的文件与配置进行比较
spark-submit --master yarn --deploy-mode cluster --conf "spark.yarn.maxAppAttempts=1" --conf "spark.pyspark.python=/usr/bin/python3" --conf "spark.pyspark.driver=/usr/bin/python3" --executor-memory 6G --driver-memory 4G --executor-cores 8 --num-executors 10
假设loc是一个列,它将出现在所有数据中,并且均匀地倾斜在loc上,并且知道当前的最大数据量,根据文件的记录计数和数据量,可以应用什么样的最佳逻辑来选择执行器、内存、重新分区因子。
我不想盲目地调整价值观,而是有一个程序化的方法来进行这种比较。
我不应该像下面那样基于位置号进行分区并存储,然后将包含range\u部分的比较也作为一个键进行,但是仅仅分区需要一些时间。
df_compare_src_df=spark.sql('select A.*,abs(hash(loc) % 600) as range_part from SourceTable A')
df_compare_src_df.write.mode('overwrite').partitionBy("range_part").parquet("/hdfstmp/anycompare/D6C9DAC3_src_part_oracle_23870_20200716_054852/")
请分享想法。
暂无答案!
目前还没有任何答案,快来回答吧!