python3.x—如何确保在成功完成spark作业的情况下重新分区完整的数据?

tf7tbtn2  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(408)

我的目标是从源重新划分数据并将其保存在目标路径。我打算在每个分区中只创建一个s3对象,并通过以下方法实现了这一点:

df.repartition("created_year", "created_month", "created_day").write.mode('overwrite').partitionBy( "created_year", "created_month", "created_day").parquet(dest_path)

我想确保所有的数据都已传输,并且我了解到重新分区可能会丢失重复的数据。因此,我决定检查源和目标的不同计数是否应该匹配。所以,我做了以下工作:

source_df.distinct().count() == destination.distinct().count()

这是回报 False 表示在已完成所有任务的作业中,源和目标的不同计数是不同的。
这是检查完整数据是否被重新分区和保存的正确方法吗?什么是更好的/正确的方法?
源和目标是amazons3上的两个不同的bucket。
可能的mvc是:

def count_distinct(src_path, spark):
    try:
        df = spark.read.parquet(f'{src_path}')
        distinct_count = df.distinct().count()
        print(distinct_count)
        return distinct_count
    except:
        log_failed_bucket(src_path)
        return None

def compare_distinct(spark, bucket_name):
    src_path = form_path_string(bucket_name)
    original_distinct_count = count_distinct(src_path, spark)
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    final_distinct_count = count_distinct(dest_path, spark)
    return original_distinct_count == final_distinct_count
5m1hhzi4

5m1hhzi41#

除非你把所有的栏都写进去了 partitionBy 在编写和提供中的所有列时,不可能删除重复项 partitionBy 也是 not possible .
如果有任何 nulls or empty 任何分区列的 __HIVE_DEFAULT_PARTITION__ 文件夹分别添加到分区列。 If multiple paths are read using spark.read.format().load(), then you should providebasePathoption (there's a chance of missing paths if it's formed dynamically), else you could directly loadbasePathand follow sanity approach mentioned below 您可以在基于源数据集和目标数据集之间的分区列进行分组后检查count/distinct。
可以使用sourcepath从basepath检查总计数。
源和目标之间分区列的不同值计数组合检查。

相关问题