我有一个Hive SQL查询(在AWS Athena上运行),我试图将其转换为Pyspark代码。SQL查询和Pyspark代码都运行在相同的数据上。但是,当我比较这两个查询的输出行计数时,它们有很大的不同。
SQL查询返回250,000行,而Pyspark代码返回3.07亿行。我仔细检查了Pyspark代码,它似乎等同于SQL查询。我不知道为什么Pyspark代码会返回这么多行。
下面是我尝试转换的Hive SQL查询:
self_joined_tbl AS (
SELECT
tbl1.item_id,
tbl1.p_date,
tbl1.uid,
tbl1.direct_uid,
tbl1.related_uid,
tbl1.uid_type,
COALESCE(tbl2.related_uid, tbl3.related_uid) AS new_related_uid
FROM input_table AS tbl1
LEFT JOIN input_table AS tbl2
ON tbl1.direct_uid = tbl2.direct_uid
AND tbl1.item_id = tbl2.item_id
AND tbl2.uid_type IN ('new')
LEFT JOIN input_table AS tbl3
ON tbl1.direct_uid = tbl3.direct_uid
AND tbl1.item_id = tbl3.item_id
AND tbl3.uid_type IN ('old')
)
下面是我从SQL查询转换而来的Pyspark代码:
self_joined_tbl_df = (
input_table_df.alias("tbl1")
.join(
input_table_df.alias("tbl2"),
on=(
(F.col("tbl1.direct_uid") == F.col("tbl2.direct_uid"))
& (F.col("tbl1.item_id") == F.col("tbl2.item_id"))
& (F.col("tbl2.uid_type").isin("new"))
),
how="left"
)
.join(
input_table_df.alias("tbl3"),
on=(
(F.col("tbl1.direct_uid") == F.col("tbl3.direct_uid"))
& (F.col("tbl1.item_id") == F.col("tbl3.item_id"))
& (F.col("tbl3.uid_type").isin("old"))
),
how="left"
)
.select(
F.col("tbl1.item_id"),
F.col("tbl1.item_type"),
F.col("tbl1.item_name"),
F.col("tbl1.p_date"),
F.col("tbl1.uid"),
F.col("tbl1.direct_uid"),
F.col("tbl1.related_uid"),
F.col("tbl1.uid_type"),
F.coalesce(F.col("tbl2.related_uid"),
F.col("tbl3.related_uid")).alias("new_related_uid")
)
)
我希望有人能帮助我确定为什么Pyspark代码返回的行比SQL查询多得多,并建议对Pyspark代码进行任何更改以解决这个问题。谢谢大家!
1条答案
按热度按时间zhte4eai1#
我建议看看non-deterministic behavior of pyspark。我最近遇到了类似的问题,但在我的情况下,是
dropDuplicates
引入了变量结果。通过多次执行代码并在某些条件下预览计数(执行3-5次后可能会出现问题),看看这是否是您的问题。