Spark连接数据集和发现差异-数据比较器

byqmnocz  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(241)

我正在将一堆应用程序转换为不同的技术。作为其中的一部分,需要比较旧数据和新数据。这篇文章的想法是要有一个通用的框架,可以帮助比较任何两个输入。我还为每一列正在更改的内容提供了一个标志。
比较时有两种比较。一个有钥匙,另一个没有钥匙。
我需要这个通用框架来处理最大35亿行的任何数量的记录。可能的最大gb数据为400gb—数据范围从1到400gb。
另一个常见的情况是每个数据中都有一个loc或itm列,并且数据均匀地分布在这些列上,但是存储在hdfs中的数据不是这样分区的。下面的方法适用于中等容量的记录,但在比较80gb的数据时失败。
首先读取源和目标数据集

if (SourceConType=='hdfs' and SourceFileFormat.lower() == 'parquet'):
            df_sourceFile=spark.read.parquet(SourceFile)
        elif SourceConType == 'hive':
            df_sourceFile=spark.sql(source_sql)
        elif SourceConType=='hdfs':
            if SourceFileFormat.lower()== 'parquet':
                df_sourceFile = spark.read.parquet(SourceFile)
            elif SourceFileFormat.lower()== 'orc':
                df_sourceFile = spark.read.orc(SourceFile)
            elif SourceFileFormat.lower() in ( 'csv' , 'text'):
                df_sourceFile = spark.read.csv(SourceFile,header=True,sep=SourceFile_Sep,nullValue="")
            else:
                print("Only ORC,Parquet, csv and text are suported in hdfs")
        else:
            df_sourceFile = spark.read.csv(SourceFile, header=True, sep=SourceFile_Sep, nullValue="")
        if (targetConType=='hdfs' and TargetFileFormat.lower() == 'parquet'):
            df_TgtFile=spark.read.parquet(TargetFile)
        elif targetConType == 'hive':
            df_TgtFile=spark.sql(target_sql)
        elif targetConType=='hdfs':
            if TargetFileFormat.lower()== 'parquet':
                df_TgtFile = spark.read.parquet(TargetFile)
            elif TargetFileFormat.lower()== 'orc':
                df_TgtFile = spark.read.orc(TargetFile)
            elif TargetFileFormat.lower() in ( 'csv' , 'text'):
                df_TgtFile = spark.read.csv(TargetFile,header=True,sep=targetFile_Sep,nullValue="")
            else:
                print("Only ORC,Parquet, csv and text are suported in hdfs")
        else:
            df_TgtFile = spark.read.csv(SourceFile, header=True, sep=targetFile_Sep, nullValue="")

键列列表-基于用户提供的键列表-使用键列表创建联接条件-对于lit key\u column\u list=['loc','itm','ven'],它创建并联接'b.loc为null或b.itm为null或b.ven为null'

join_condition = ''
key_column_list_join = []
b_left_join_condition =''
b_left_cond =''
for keycolindex in range(len(key_column_list)):
                join_condition = join_condition + "COALESCE (a." + key_column_list[keycolindex].strip() + ",'') = COALESCE (b." + \
                                 key_column_list[keycolindex].strip() + ",'') and "
                b_left_join_condition = b_left_join_condition + "b." + key_column_list[keycolindex].strip() + " is NULL "
                b_left_cond = "b." + key_column_list[keycolindex].strip() + " is NULL "
                key_column_list_join.append(b_left_cond)

key_column_list_join = ' OR '.join(key_column_list_join)

下面将创建包含case和coalease语句的所有列

d_compare_columns=df_compare_src_df.dtypes
for compare_columns_index in range(len(d_compare_columns)):
    if d_compare_columns[compare_columns_index][0] not in key_column_list and \
            d_compare_columns[compare_columns_index][1] == 'string':
        flag_set_sql = flag_set_sql + ', case WHEN COALESCE (A.' + d_compare_columns[compare_columns_index][
            0] + ',"")=COALESCE (B.' + \
                       d_compare_columns[compare_columns_index][0] + ',"") THEN "Y" ELSE "N" END AS ' + \
                       d_compare_columns[compare_columns_index][0] + "_flag"
        flag_set1_sql = flag_set1_sql + ', "" as ' + d_compare_columns[compare_columns_index][0] + "_flag"
    if d_compare_columns[compare_columns_index][0] not in key_column_list and \
            d_compare_columns[compare_columns_index][1].startswith(tuple(date_list)):
        flag_set_sql = flag_set_sql + ', case WHEN COALESCE (A.' + d_compare_columns[compare_columns_index][
            0] + ',"1800-01-01")=COALESCE (B.' + \
                       d_compare_columns[compare_columns_index][
                           0] + ',"1800-01-01") THEN "Y" ELSE "N" END AS ' + \
                       d_compare_columns[compare_columns_index][0] + "_flag"
        flag_set1_sql = flag_set1_sql + ', "" as ' + d_compare_columns[compare_columns_index][0] + "_flag"
    if d_compare_columns[compare_columns_index][0] not in key_column_list and \
            d_compare_columns[compare_columns_index][1].startswith(tuple(dec_list)):
        flag_set_sql = flag_set_sql + ', case WHEN COALESCE (A.' + d_compare_columns[compare_columns_index][
            0] + ',"-99999999")=COALESCE (B.' + \
                       d_compare_columns[compare_columns_index][
                           0] + ',"-99999999") THEN "Y" ELSE "N" END AS ' + \
                       d_compare_columns[compare_columns_index][0] + "_flag"
        flag_set1_sql = flag_set1_sql + ', "" as ' + d_compare_columns[compare_columns_index][0] + "_flag"

然后创建一个包含所有列的哈希

df_compare_tgt_df = df_compare_tgt_df.withColumn("all_hash_val", sha2(concat_ws("||", *df_compare_tgt_df.columns), 256))
df_compare_src_df = df_compare_src_df.withColumn("all_hash_val",
                                                 sha2(concat_ws("||", *df_compare_src_df.columns), 256))

根据用户提供的键与否-对键列做左外联接和内联接所有列哈希值,以标识插入、删除、更新和无更改。如果有任何更改,则为更改的列标记y。

df_compare_tgt_df.createOrReplaceTempView("NewTable")
        df_compare_src_df.createOrReplaceTempView("OldTable")
        if len(key_column_list[0]) > 0:
            print("Preparing Compare Sqls")
            print(len(key_column_list))
            new_rec_sql='select "TGT" AS SRC, A.* ,"I" as CHG_IND ' + flag_set1_sql + ' from NewTable A left outer join OldTable B on ' + join_condition + ' WHERE ' + key_column_list_join
            del_rec_sql='select "SRC" AS SRC, A.*,"D" as CHG_IND ' + flag_set1_sql + ' from OldTable A left outer join NewTable B on ' + join_condition + ' WHERE ' + key_column_list_join
            matched_rec_sql='select "SRC" AS SRC , "NC" as CHG_IND, A.* ' + flag_set1_sql + ' from NewTable A inner join OldTable B on ' + join_condition + ' where A.all_hash_val=B.all_hash_val'
            sql_a = 'select "TGT" AS SRC , "C" as CHG_IND, A.* ' + flag_set_sql + ' from NewTable A inner join OldTable B on ' + join_condition + ' where A.all_hash_val!=B.all_hash_val'
            sql_b = 'select "SRC" AS SRC , "C" as CHG_IND, B.* ' + flag_set_sql + ' from NewTable A inner join OldTable B on ' + join_condition + ' where A.all_hash_val!=B.all_hash_val'
            mis_matched_rec_sql='select * from ( ' + sql_a + ' union ' + sql_b + ' ) A order by ' + Keys + ', SRC'
        else:
            new_rec_sql='select "TGT" AS SRC, A.* ,"I" as CHG_IND ' + flag_set1_sql + ' from NewTable A left outer join OldTable B on A.all_hash_val=B.all_hash_val WHERE B.all_hash_val is NULL'
            del_rec_sql='select "SRC" AS SRC, A.*,"D" as CHG_IND ' + flag_set1_sql + ' from OldTable A left outer join NewTable B on A.all_hash_val=B.all_hash_val WHERE B.all_hash_val is NULL'
            matched_rec_sql='select A.* ,"NC" as CHG_IND from NewTable A inner join OldTable B on A.all_hash_val=B.all_hash_val'

        df_new_records = spark.sql(new_rec_sql)
        df_del_records = spark.sql(del_rec_sql)
        df_matched_records = spark.sql(matched_rec_sql)

基本上它执行下面的sql

NEW
select "TGT" AS SRC, A.* ,"I" as CHG_IND , "" as period2_flag, "" as period3_flag, "" as model_flag, "" as period1_flag, "" as keycol_flag from NewTable A left outer join OldTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and  1=1  WHERE b.dmdunit is NULL  OR b.dmdgroup is NULL  OR b.loc is NULL  OR b.startdate is NULL  OR b.type is NULL  OR b.fcstid is NULL 

DELETES
select "SRC" AS SRC, A.*,"D" as CHG_IND , "" as period2_flag, "" as period3_flag, "" as model_flag, "" as period1_flag, "" as keycol_flag from OldTable A left outer join NewTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and  1=1  WHERE b.dmdunit is NULL  OR b.dmdgroup is NULL  OR b.loc is NULL  OR b.startdate is NULL  OR b.type is NULL  OR b.fcstid is NULL 

NOCHNAGES
select "SRC" AS SRC , "NC" as CHG_IND, A.* , "" as period2_flag, "" as period3_flag, "" as model_flag, "" as period1_flag, "" as keycol_flag from NewTable A inner join OldTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and  1=1  where A.all_hash_val=B.all_hash_val

MISMATCHES
select * from ( select "TGT" AS SRC , "C" as CHG_IND, A.* , case WHEN COALESCE (A.period2,"-99999999")=COALESCE (B.period2,"-99999999") THEN "Y" ELSE "N" END AS period2_flag, case WHEN COALESCE (A.period3,"-99999999")=COALESCE (B.period3,"-99999999") THEN "Y" ELSE "N" END AS period3_flag, case WHEN COALESCE (A.model,"")=COALESCE (B.model,"") THEN "Y" ELSE "N" END AS model_flag, case WHEN COALESCE (A.period1,"-99999999")=COALESCE (B.period1,"-99999999") THEN "Y" ELSE "N" END AS period1_flag, case WHEN COALESCE (A.keycol,"-99999999")=COALESCE (B.keycol,"-99999999") THEN "Y" ELSE "N" END AS keycol_flag from NewTable A inner join OldTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and  1=1  where A.all_hash_val!=B.all_hash_val union select "SRC" AS SRC , "C" as CHG_IND, B.* , case WHEN COALESCE (A.period2,"-99999999")=COALESCE (B.period2,"-99999999") THEN "Y" ELSE "N" END AS period2_flag, case WHEN COALESCE (A.period3,"-99999999")=COALESCE (B.period3,"-99999999") THEN "Y" ELSE "N" END AS period3_flag, case WHEN COALESCE (A.model,"")=COALESCE (B.model,"") THEN "Y" ELSE "N" END AS model_flag, case WHEN COALESCE (A.period1,"-99999999")=COALESCE (B.period1,"-99999999") THEN "Y" ELSE "N" END AS period1_flag, case WHEN COALESCE (A.keycol,"-99999999")=COALESCE (B.keycol,"-99999999") THEN "Y" ELSE "N" END AS keycol_flag from NewTable A inner join OldTable B on COALESCE (a.dmdunit,'') = COALESCE (b.dmdunit,'') and COALESCE (a.dmdgroup,'') = COALESCE (b.dmdgroup,'') and COALESCE (a.loc,'') = COALESCE (b.loc,'') and COALESCE (a.startdate,'') = COALESCE (b.startdate,'') and COALESCE (a.type,'') = COALESCE (b.type,'') and COALESCE (a.fcstid,'') = COALESCE (b.fcstid,'') and  1=1  where A.all_hash_val!=B.all_hash_val ) A order by DMDUNIT, DMDGROUP, LOC, STARTDATE,TYPE,FCSTID, SRC

现在我面临执行的性能或ooo错误-所以我想重新分配数据分区,使其更好地运行,而不会失败
根据源行数决定重新分区,因为源数据在任何意义上都没有分区。

if SourceCount >=0 and SourceCount <= 1000000:
    v_partition=200
    spark.conf.set("spark.sql.shuffle.partitions", 200)
elif SourceCount >= 1000001 and SourceCount <= 10000000:
    v_partition = 400
    spark.conf.set("spark.sql.shuffle.partitions", 400)
elif SourceCount >= 10000001 and SourceCount <= 100000000 :
    v_partition = 600
    spark.conf.set("spark.sql.shuffle.partitions", 600)
elif SourceCount >= 100000001 and SourceCount <= 1000000000 :
    v_partition = 1200
    spark.conf.set("spark.sql.shuffle.partitions", 1200)
elif SourceCount >= 1000000000:
    v_partition = 2400
    spark.conf.set("spark.sql.shuffle.partitions", 2400)
else:
    v_partition = 200

if len(key_column_list):
    print('Repartition the data based on the keys with partition count as - ' + str(v_partition))
    df_compare_tgt_df=()
    df_compare_tgt_df=df_compare_tgt_df.repartition(v_partition,[x.lower() for x in key_column_list])
    df_compare_src_df=df_compare_src_df.repartition(v_partition,[x.lower() for x in key_column_list])
else:
    print('Repartition the data based all hash value with partition count as - ' + str(v_partition))
    df_compare_tgt_df = df_compare_tgt_df.repartition(v_partition, "all_hash_val")
    df_compare_src_df = df_compare_src_df.repartition(v_partition, "all_hash_val")

我有3个工作节点

Memory VCORE
186 GB  25
186 GB  25
183 GB  25

我试着运行2个80gb的文件与配置进行比较

spark-submit --master yarn --deploy-mode cluster --conf "spark.yarn.maxAppAttempts=1" --conf "spark.pyspark.python=/usr/bin/python3" --conf "spark.pyspark.driver=/usr/bin/python3"  --executor-memory 6G --driver-memory 4G --executor-cores 8 --num-executors 10

假设loc是一个列,它将出现在所有数据中,并且均匀地倾斜在loc上,并且知道当前的最大数据量,根据文件的记录计数和数据量,可以应用什么样的最佳逻辑来选择执行器、内存、重新分区因子。
我不想盲目地调整价值观,而是有一个程序化的方法来进行这种比较。
我不应该像下面那样基于位置号进行分区并存储,然后将包含range\u部分的比较也作为一个键进行,但是仅仅分区需要一些时间。

df_compare_src_df=spark.sql('select A.*,abs(hash(loc) % 600) as range_part from SourceTable A')
df_compare_src_df.write.mode('overwrite').partitionBy("range_part").parquet("/hdfstmp/anycompare/D6C9DAC3_src_part_oracle_23870_20200716_054852/")

请分享想法。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题