如何将这个sql查询转换成pyspark代码?

x4shl7ld  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(412)

我有这张table

df = spark.createDataFrame(
    [
        (1, 12345, "a@gmail.com", "2020-01-01"),
        (1, 12345, "a@gmail.com", "2020-01-02"),
        (1, 23456, "a@gmail.com", "2020-01-03"),
        (1, 34567, "a@gmail.com", "2020-01-04"),
        (1, 12345, "a@gmail.com", "2020-01-05"),
        (1, 45678, "a@gmail.com", "2020-01-06"),
        (1, 45678, "a@gmail.com", "2020-01-07"),
        (2, 56789, "b@gmail.com", "2020-01-01"),
        (2, 56789, "b@gmail.com", "2020-01-02"),
        (2, 56789, "c@gmail.com", "2020-01-03"),
        (2, 67890, "c@gmail.com", "2020-01-04"),
        (2, 67890, "c@gmail.com", "2020-01-05"),
        (3, 78901, "d@gmail.com", "2020-01-01"),
        (3, 78901, "d@gmail.com", "2020-01-02"),
        (3, 78901, "d@gmail.com", "2020-01-03"),
    ],
    ["id", "phone_number", "email", "date"],
)

我要从中选择所有行,这些行要么是每个id的第一个日期,要么是电话号码或电子邮件地址自上一个日期以来已更改。
我通过创建一个临时视图,然后对其执行一个原始sql查询来实现这一点,如下所示:

df.createOrReplaceTempView("df")

df = spark.sql(
    """
    SELECT  a.*
    FROM (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS a
    LEFT JOIN (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS b
    ON a.row = b.row + 1 AND a.id = b.id
    WHERE a.phone_number != b.phone_number OR b.phone_number IS NULL OR a.email != b.email OR b.email IS NULL
    """
)

但是,我更希望使用纯pyspark函数来获得相同的结果。如何将此sql查询转换为pyspark?
这就是我迄今为止所尝试的:

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

a = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
b = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))

df = a.join(b, on=[a.row == b.row + 1, a.id == b.id], how="left").where(
    (a.phone_number != b.phone_number)
    | (b.phone_number.isNull())
    | (a.email != b.email)
    | (b.email.isNull())
)
qncylg1j

qncylg1j1#

我会做的有点不同。不遵循sql,而是直接应用业务规则:

w = Window.partitionBy("id").orderBy("date")

df.withColumn(
    "rnk", F.row_number().over(w)
).withColumn(
    "old", F.lag(F.struct([F.col("phone_number"), F.col("email")])).over(w)
).where(
    (F.col("rnk") == 1)
    | (F.col("phone_number") != F.col("old.phone_number"))
    | (F.col("email") != F.col("old.email"))
).show()

+---+------------+-----------+----------+---+--------------------+
| id|phone_number|      email|      date|rnk|                 old|
+---+------------+-----------+----------+---+--------------------+
|  1|       12345|a@gmail.com|2020-01-01|  1|                null|
|  1|       23456|a@gmail.com|2020-01-03|  3|[12345, a@gmail.com]|
|  1|       34567|a@gmail.com|2020-01-04|  4|[23456, a@gmail.com]|
|  1|       12345|a@gmail.com|2020-01-05|  5|[34567, a@gmail.com]|
|  1|       45678|a@gmail.com|2020-01-06|  6|[12345, a@gmail.com]|
|  3|       78901|d@gmail.com|2020-01-01|  1|                null|
|  2|       56789|b@gmail.com|2020-01-01|  1|                null|
|  2|       56789|c@gmail.com|2020-01-03|  3|[56789, b@gmail.com]|
|  2|       67890|c@gmail.com|2020-01-04|  4|[56789, c@gmail.com]|
+---+------------+-----------+----------+---+--------------------+

注意:您可以将rnk上的测试替换为rnk上的测试 F.col("old").isNull() (因此,您不必计算rnk)

相关问题