我的目标是从源重新划分数据并将其保存在目标路径。我打算在每个分区中只创建一个s3对象,并通过以下方法实现了这一点:
df.repartition("created_year", "created_month", "created_day").write.mode('overwrite').partitionBy( "created_year", "created_month", "created_day").parquet(dest_path)
我想确保所有的数据都已传输,并且我了解到重新分区可能会丢失重复的数据。因此,我决定检查源和目标的不同计数是否应该匹配。所以,我做了以下工作:
source_df.distinct().count() == destination.distinct().count()
这是回报 False
表示在已完成所有任务的作业中,源和目标的不同计数是不同的。
这是检查完整数据是否被重新分区和保存的正确方法吗?什么是更好的/正确的方法?
源和目标是amazons3上的两个不同的bucket。
可能的mvc是:
def count_distinct(src_path, spark):
try:
df = spark.read.parquet(f'{src_path}')
distinct_count = df.distinct().count()
print(distinct_count)
return distinct_count
except:
log_failed_bucket(src_path)
return None
def compare_distinct(spark, bucket_name):
src_path = form_path_string(bucket_name)
original_distinct_count = count_distinct(src_path, spark)
dest_path = form_path_string(bucket_name, repartitioned_data=True)
final_distinct_count = count_distinct(dest_path, spark)
return original_distinct_count == final_distinct_count
1条答案
按热度按时间5m1hhzi41#
除非你把所有的栏都写进去了
partitionBy
在编写和提供中的所有列时,不可能删除重复项partitionBy
也是not possible
.如果有任何
nulls or empty
任何分区列的__HIVE_DEFAULT_PARTITION__
文件夹分别添加到分区列。If multiple paths are read using spark.read.format().load(), then you should provide
basePathoption (there's a chance of missing paths if it's formed dynamically), else you could directly load
basePathand follow sanity approach mentioned below
您可以在基于源数据集和目标数据集之间的分区列进行分组后检查count/distinct。可以使用sourcepath从basepath检查总计数。
源和目标之间分区列的不同值计数组合检查。