在Spark SQL中如何分区并随机排序

rsl1atfo  于 2023-03-19  发布在  Apache
关注(0)|答案(1)|浏览(368)

我尝试使用Spark SQL按地区和市场划分客户样本集,但在划分结果中,我想选择返回的随机100,000个用户。
有没有办法在Spark SQL的partition by order by子句中使用random()?下面的代码使用random()总是返回错误,但是没有random(),它工作得很完美。提前感谢您的回答!

customer_panel_s3_location = f"s3://my-bucket/region_id={region_id}/marketplace_id={marketplace_id}/"
customer_panel_table = spark.read.parquet(customer_panel_s3_location)
customer_panel_table.createOrReplaceTempView("customer_panel")
dataset_date = '2023-03-16'
customer_sample_size = 100000
partition_customers_by = 'region_id, marketplace_id'

# The code below returns an error 
customer_panel_df = spark.sql(f"""
    SELECT *
    FROM (
        SELECT *
        , row_number() over (partition by {partition_customers_by} order by random()) AS rn
        FROM
          customer_panel AS c
        WHERE
          c.target_date < CAST('{dataset_date}' AS DATE)
          AND c.target_date >= date_sub(CAST('{dataset_date}' AS DATE), 7)
        ) t
    WHERE t.rn <= bigint({customer_sample_size})
""")

# But after removing 'random()', it works
customer_panel_df = spark.sql(f"""
        SELECT *
        , row_number() over (partition by {partition_customers_by} order by {partition_customers_by}) AS rn
        FROM
          customer_panel AS c
        WHERE
          c.target_date < CAST('{dataset_date}' AS DATE)
          AND c.target_date >= date_sub(CAST('{dataset_date}' AS DATE), 7)
""")

print(f"Row count of {table_name}: {customer_panel_df.count():,}")
jv2fixgn

jv2fixgn1#

不幸的是,在Spark SQL中,无法在窗口函数row_number()的ORDER BY子句中使用random()函数,这是因为random()会生成一个不确定的值,这意味着它会对相同的输入参数产生不同的结果。
实现所需结果的一个可能的解决方案是使用rand()函数而不是random()rand()函数基于种子值生成确定性值,种子值可以使用spark.sql.functions.rand(seed)函数设置。
下面是从每个分区中选择客户的随机子集的相同实现:

from pyspark.sql.functions import rand

customer_panel_df = spark.sql(f"""
    SELECT *
    FROM (
        SELECT *,
            row_number() over (partition by {partition_customers_by} order by rand({seed})) AS rn
        FROM customer_panel AS c
        WHERE c.target_date < CAST('{dataset_date}' AS DATE)
          AND c.target_date >= date_sub(CAST('{dataset_date}' AS DATE), 7)
        ) t
    WHERE t.rn <= {customer_sample_size}
""")

请注意,rand()函数将种子值作为参数,这可确保该函数为每个分区返回确定性值。您可以根据使用情况选择任何整数值作为种子。

相关问题