我有这张table
df = spark.createDataFrame(
[
(1, 12345, "a@gmail.com", "2020-01-01"),
(1, 12345, "a@gmail.com", "2020-01-02"),
(1, 23456, "a@gmail.com", "2020-01-03"),
(1, 34567, "a@gmail.com", "2020-01-04"),
(1, 12345, "a@gmail.com", "2020-01-05"),
(1, 45678, "a@gmail.com", "2020-01-06"),
(1, 45678, "a@gmail.com", "2020-01-07"),
(2, 56789, "b@gmail.com", "2020-01-01"),
(2, 56789, "b@gmail.com", "2020-01-02"),
(2, 56789, "c@gmail.com", "2020-01-03"),
(2, 67890, "c@gmail.com", "2020-01-04"),
(2, 67890, "c@gmail.com", "2020-01-05"),
(3, 78901, "d@gmail.com", "2020-01-01"),
(3, 78901, "d@gmail.com", "2020-01-02"),
(3, 78901, "d@gmail.com", "2020-01-03"),
],
["id", "phone_number", "email", "date"],
)
我要从中选择所有行,这些行要么是每个id的第一个日期,要么是电话号码或电子邮件地址自上一个日期以来已更改。
我通过创建一个临时视图,然后对其执行一个原始sql查询来实现这一点,如下所示:
df.createOrReplaceTempView("df")
df = spark.sql(
"""
SELECT a.*
FROM (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS a
LEFT JOIN (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS b
ON a.row = b.row + 1 AND a.id = b.id
WHERE a.phone_number != b.phone_number OR b.phone_number IS NULL OR a.email != b.email OR b.email IS NULL
"""
)
但是,我更希望使用纯pyspark函数来获得相同的结果。如何将此sql查询转换为pyspark?
这就是我迄今为止所尝试的:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
a = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
b = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
df = a.join(b, on=[a.row == b.row + 1, a.id == b.id], how="left").where(
(a.phone_number != b.phone_number)
| (b.phone_number.isNull())
| (a.email != b.email)
| (b.email.isNull())
)
1条答案
按热度按时间qncylg1j1#
我会做的有点不同。不遵循sql,而是直接应用业务规则:
注意:您可以将rnk上的测试替换为rnk上的测试
F.col("old").isNull()
(因此,您不必计算rnk)