python 为什么我的Pyspark代码返回的行数与Hive SQL查询的行数不同?

jgovgodb  于 2023-04-28  发布在  Python
关注(0)|答案(1)|浏览(144)

我有一个Hive SQL查询(在AWS Athena上运行),我试图将其转换为Pyspark代码。SQL查询和Pyspark代码都运行在相同的数据上。但是,当我比较这两个查询的输出行计数时,它们有很大的不同。
SQL查询返回250,000行,而Pyspark代码返回3.07亿行。我仔细检查了Pyspark代码,它似乎等同于SQL查询。我不知道为什么Pyspark代码会返回这么多行。
下面是我尝试转换的Hive SQL查询:

self_joined_tbl AS (
    SELECT
        tbl1.item_id,
        tbl1.p_date,
        tbl1.uid,
        tbl1.direct_uid,
        tbl1.related_uid,
        tbl1.uid_type,
        COALESCE(tbl2.related_uid, tbl3.related_uid) AS new_related_uid
    FROM input_table AS tbl1
    LEFT JOIN input_table AS tbl2
        ON tbl1.direct_uid = tbl2.direct_uid
        AND tbl1.item_id = tbl2.item_id
        AND tbl2.uid_type IN ('new')
    LEFT JOIN input_table AS tbl3
        ON tbl1.direct_uid = tbl3.direct_uid
        AND tbl1.item_id = tbl3.item_id
        AND tbl3.uid_type IN ('old')
)

下面是我从SQL查询转换而来的Pyspark代码:

self_joined_tbl_df = (
        input_table_df.alias("tbl1")
        .join(
            input_table_df.alias("tbl2"),
            on=(
                (F.col("tbl1.direct_uid") == F.col("tbl2.direct_uid"))
                & (F.col("tbl1.item_id") == F.col("tbl2.item_id"))
                & (F.col("tbl2.uid_type").isin("new"))
            ),
            how="left"
        )
        .join(
            input_table_df.alias("tbl3"),
            on=(
                (F.col("tbl1.direct_uid") == F.col("tbl3.direct_uid"))
                & (F.col("tbl1.item_id") == F.col("tbl3.item_id"))
                & (F.col("tbl3.uid_type").isin("old"))
            ),
            how="left"
        )
        .select(
            F.col("tbl1.item_id"),
            F.col("tbl1.item_type"),
            F.col("tbl1.item_name"),
            F.col("tbl1.p_date"),
            F.col("tbl1.uid"),
            F.col("tbl1.direct_uid"),
            F.col("tbl1.related_uid"),
            F.col("tbl1.uid_type"),
            F.coalesce(F.col("tbl2.related_uid"),
                       F.col("tbl3.related_uid")).alias("new_related_uid")
        )
)

我希望有人能帮助我确定为什么Pyspark代码返回的行比SQL查询多得多,并建议对Pyspark代码进行任何更改以解决这个问题。谢谢大家!

zhte4eai

zhte4eai1#

我建议看看non-deterministic behavior of pyspark。我最近遇到了类似的问题,但在我的情况下,是dropDuplicates引入了变量结果。通过多次执行代码并在某些条件下预览计数(执行3-5次后可能会出现问题),看看这是否是您的问题。

相关问题