Python3.x—从一列中查找最大值,并基于最大值填充另一列

fwzugrvs  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(422)

我在csv文件中有增量加载。我在Dataframe中读取csv。Dataframe有一列包含一些字符串。我必须从这个列中找到不同的字符串并分配一个 ID (整数)到从 0 在加入另一个Dataframe之后。
在下一次运行中,我必须在找到 ID 列并为不同的字符串递增。在任何有空的地方 ID 列,我必须从上一次运行的值中增加(+1)。
第一次运行
字符串ID0第一个1第二个第三个第四个4
第二次运行 MAX(ID) = 4 字符串零第一个第二个第三个第四个第五个第六个第七个第八个
我试过了,但没能成功。。

max = df.agg({"ID": "max"}).collect()[0][0]
df_incremented = df.withcolumn("ID", when(col("ID").isNull(),expr("max += 1")))

让我知道是否有一个简单的方法来实现这一点。

rbl8hiat

rbl8hiat1#

由于只保留不同的值,因此可以使用 row_number 窗口上的函数:

from pyspark.sql import Window
from pyspark.sql import functions as F

 df = spark.createDataFrame(
    [("a",), ("a",), ("b",), ("c",), ("d",), ("e",), ("e",)],
    ("string",)
)

w = Window.orderBy("string")

df1 = df.distinct().withColumn("ID", F.row_number().over(w) - 1)

df1.show()

# +------+---+

# |string| ID|

# +------+---+

# |     a|  0|

# |     b|  1|

# |     c|  2|

# |     d|  3|

# |     e|  4|

# +------+---+

现在让我们在这个Dataframe中添加一些行并使用 row_number 随着 coalesce 分配 ID 仅适用于为空的行(无需获取最大值):

df2 = df1.union(spark.sql("select * from values ('f', null), ('h', null), ('i', null)"))

df3 = df2.withColumn("ID", F.coalesce("ID", F.row_number(w) - 1))

df3.show()

# +------+---+

# |string| ID|

# +------+---+

# |     a|  0|

# |     b|  1|

# |     c|  2|

# |     d|  3|

# |     e|  4|

# |     f|  5|

# |     h|  6|

# |     i|  7|

# +------+---+

如果您也希望保留重复的值并为它们指定相同的值 ID ,然后使用 dense_rank 而不是 row_number .

相关问题