我尝试使用Spark SQL按地区和市场划分客户样本集,但在划分结果中,我想选择返回的随机100,000个用户。
有没有办法在Spark SQL的partition by order by
子句中使用random()
?下面的代码使用random()
总是返回错误,但是没有random()
,它工作得很完美。提前感谢您的回答!
customer_panel_s3_location = f"s3://my-bucket/region_id={region_id}/marketplace_id={marketplace_id}/"
customer_panel_table = spark.read.parquet(customer_panel_s3_location)
customer_panel_table.createOrReplaceTempView("customer_panel")
dataset_date = '2023-03-16'
customer_sample_size = 100000
partition_customers_by = 'region_id, marketplace_id'
# The code below returns an error
customer_panel_df = spark.sql(f"""
SELECT *
FROM (
SELECT *
, row_number() over (partition by {partition_customers_by} order by random()) AS rn
FROM
customer_panel AS c
WHERE
c.target_date < CAST('{dataset_date}' AS DATE)
AND c.target_date >= date_sub(CAST('{dataset_date}' AS DATE), 7)
) t
WHERE t.rn <= bigint({customer_sample_size})
""")
# But after removing 'random()', it works
customer_panel_df = spark.sql(f"""
SELECT *
, row_number() over (partition by {partition_customers_by} order by {partition_customers_by}) AS rn
FROM
customer_panel AS c
WHERE
c.target_date < CAST('{dataset_date}' AS DATE)
AND c.target_date >= date_sub(CAST('{dataset_date}' AS DATE), 7)
""")
print(f"Row count of {table_name}: {customer_panel_df.count():,}")
1条答案
按热度按时间jv2fixgn1#
不幸的是,在Spark SQL中,无法在窗口函数
row_number()
的ORDER BY子句中使用random()
函数,这是因为random()
会生成一个不确定的值,这意味着它会对相同的输入参数产生不同的结果。实现所需结果的一个可能的解决方案是使用
rand()
函数而不是random()
。rand()
函数基于种子值生成确定性值,种子值可以使用spark.sql.functions.rand(seed)
函数设置。下面是从每个分区中选择客户的随机子集的相同实现:
请注意,
rand()
函数将种子值作为参数,这可确保该函数为每个分区返回确定性值。您可以根据使用情况选择任何整数值作为种子。