Pyspark数据修复

jm81lzqq  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(90)

我有一个spark Dataframe ,当一个值有一个空记录时,我试图复制一个列中的值。
我尝试使用下面的代码:

from pyspark.sql import SparkSession, functions as F

data = [(8194, 'Closed'),
        (8194, ''),
        (8194, ''),
        (8196, 'Draft'),
        (8196, ''),
        (8197, 'open'),
        (8197, ''),
        (8197, '')]
base_df = spark.createDataFrame(data, ["id","status"])

window_spec = Window.partitionBy("id").orderBy(F.col("id"))
l_id = F.lag("status").over(window_spec)
base_df = base_df.withColumn("status", F.when((base_df["status"] == "") | base_df["status"].isNull(), l_id).otherwise(base_df["status"]))

base_df.show()

此代码仅更新一条记录。有人可以帮助我实现所需的样本输出。
Sample Input
Sample Output

izj3ouym

izj3ouym1#

您可以检查last函数here
默认情况下,函数返回它看到的最后一个值。当ignoreNulls被设置为true时,它将返回它看到的最后一个非空值。如果所有值都为null,则返回null。
请注意,在您的示例中,您使用“”而不是None创建了DataFrame,这将不起作用,因为“”被视为字符串。
检查此实现:

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window

data = [(8194, 'Closed'),
        (8194, None),
        (8194, None),
        (8196, 'Draft'),
        (8196, None),
        (8197, 'open'),
        (8197, None),
        (8197, None)]
base_df = spark.createDataFrame(data, ["id","status"])

base_df.show()

返回值为:

+----+------+
|  id|status|
+----+------+
|8194|Closed|
|8194|  null|
|8194|  null|
|8196| Draft|
|8196|  null|
|8197|  open|
|8197|  null|
|8197|  null|
+----+------+

在此基础上,看最后一个函数的实际应用:

window_spec = Window.partitionBy("id").orderBy(F.col("id"))

new_base_df =  base_df.withColumn("status", F.last("status", True).over(windowSpec))
new_base_df.show()

+----+------+
|  id|status|
+----+------+
|8194|Closed|
|8194|Closed|
|8194|Closed|
|8196| Draft|
|8196| Draft|
|8197|  open|
|8197|  open|
|8197|  open|
+----+------+

相关问题