Pyspark如何包含where条件失败的行

7lrncoxx  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(133)

我有一个排序表,看起来如下所示。
| 列1|第2列|
| - -|- -|
| 千元|千元|
| 小行星2600|小行星2600|
| 小行星3600|小行星3600|
| 小行星3600|小行星4050|
| 小行星3600|小行星4500|
我想创建一个标志,使col 1和col 2都小于4000时该标志为true。
pyspark_df = pyspark_df.withColumn('flag', when((pyspark_df['col1'] <= 4000) & (pyspark_df['col2'] <= 4000), 1).otherwise(0)
但是,我还希望失败的第一行(在本例中为第4行)也将此标志设置为true。

b0zn9rqh

b0zn9rqh1#

您可以创建一个滞后列,然后在两列之间使用按位OR。

from pyspark.sql.functions import when, lag, col, monotonically_increasing_id
from pyspark.sql.window import Window

df = spark.createDataFrame(
  [[1000,1000],
  [2600,2600],
  [3600,3600],
  [3600,4500],
  [3600,4500]],['col1','col2']
)

df = df.withColumn('flag', when((df['col1'] <= 4000) & (df['col2'] <= 4000), 1).otherwise(0))

df = df.withColumn('idx', monotonically_increasing_id())

w = Window().partitionBy().orderBy(col('idx'))
df = df.withColumn('lag', lag('flag', 1).over(w))
df = df.fillna(0, subset='lag')

df = df.withColumn('flag', df.flag.bitwiseOR(df.lag))
df.select('col1','col2','flag').show()

输出量

+----+----+----+
|col1|col2|flag|
+----+----+----+
|1000|1000|   1|
|2600|2600|   1|
|3600|3600|   1|
|3600|4500|   1|
|3600|4500|   0|
+----+----+----+

相关问题