Apache Spark 基于ID值范围对ID列进行分区

jw5wzhpr  于 2023-03-30  发布在  Apache
关注(0)|答案(2)|浏览(107)

我有两张table;sales和customers。要查询的主表是sales,但有时我们希望获取有关particiae customer的数据并获取有关他的详细信息,因此我们必须在sales表上联接customers。因此,对于具有用户的表,主筛选列将是USER_ID。我想根据user_id对表进行分区,但是Databricks为每个userids创建一个分区文件。我想做的是将表分成多个文件,这些文件中有多个userid连续保存,例如partition 1将有ID为1-1000的用户,partition 2将拥有ID为1001-2000的用户,依此类推。
我有同样的问题与分区的日期,因为它创建了一个分区文件的每一天,但我想有它存储为例如5天的范围。
有没有办法在列内的范围内存储分区?如何影响创建多少个这样的分区?
到目前为止,我使用的是df.write.partitionBy('column_name').parquet('location'),它产生了上面描述的问题。

vuktfyat

vuktfyat1#

我做了一些类似的事情,通过从有问题的id中生成一个值来进行分区。如果它们是数字ID,您可以将它们与您希望收集的所需分区的数量进行模数转换,如果它们是足够随机的数字,甚至是连续的数字,它们应该可以很好地减少偏斜。

hvvq6cgz

hvvq6cgz2#

您可以通过指定分区方案来自定义创建分区的方式。您可以定义自己的分区函数,根据列中的值范围对数据进行分组,而不是使用基于列中的非重复值的默认分区。
下面是一个示例,说明如何根据USER_ID的范围对sales表进行分区:

from pyspark.sql.functions import *

def user_id_range_partition(user_id):
    return floor(user_id / 1000)

sales.write.partitionBy(user_id_range_partition('USER_ID')).parquet('location')

在这里,user_id_range_partition函数接受一个USER_ID值,并返回该值除以1000的floor除法,这将USER_ID分组为1000的范围。例如,USER_ID 1-1000将在分区0中,USER_ID 1001-2000将在分区1中,依此类推。
同样,日期也可以这样做-

from pyspark.sql.functions import *

# Define the partitioning function that groups dates into ranges of 5 days
def date_range_partition(date_col):
    start_date = to_date(lit('2022-01-01'))  # define your own start date
    days_since_start = floor((date_col - start_date).cast('int') / 5) * 5
    return date_add(start_date, days_since_start)

# Partition the sales table based on date_range_partition function
sales.withColumn('sale_date_range', date_range_partition('SALE_DATE')).write.partitionBy('sale_date_range').parquet('location')

此外,您还可以使用bucketBy。它的工作方式是根据指定列的哈希值将数据分配到固定数量的存储桶中。这对于在固定数量的文件中均匀分配数据非常有用,同时仍然允许基于用于桶化的列的有效过滤。您可以使用bucketBysales数据分布到基于USER_ID列的10个bucket中-

from pyspark.sql.functions import floor

# Define the number of buckets and the bucketing column
num_buckets = 10
bucket_column = 'USER_ID'

# Define the bucketing function that hashes USER_ID into one of 10 buckets
def bucket_user_id(user_id):
    return user_id % num_buckets

# Bucket the sales table based on the bucket_user_id function and the bucket_column
sales.write.bucketBy(num_buckets, bucket_column, {'numBuckets': num_buckets}).parquet('location')

相关问题