pyspark dataframe根据另一列中timestamp值的min/max条件更新列值

0dxa2lsx  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(467)

我有以下Dataframe:

col1        col2 

    1   2020-02-27 15:00:00 

    1   2020-02-27 15:04:00

我需要输出为

col1        col2            col3

    1   2020-02-27 15:00    
    1   2020-02-27 15:04    Y

根据col2中的最大时间戳值,col3值必须填充为y或null。
我尝试了以下方法:

df = spark.sql("select col1,col2 from table")

max_ts = df.select(max("col2")).show() 

y=(f.when(f.col('col2') == max_ts, "Y")) 

df1 = df.withColumn('col3',y)

上述方法只产生空输出。
请提出可能的解决方案或错误?
蒂亚。
edit:我需要对col1执行groupby,并在col2中获取最大值

guykilcj

guykilcj1#

也许这是有帮助的-

dsl API max(..).over(window) ```

df2.show(false)
df2.printSchema()
/**
* +----+-------------------+
* |col1|col2 |
* +----+-------------------+
* |1 |2020-02-27 15:00:00|
* |1 |2020-02-27 15:04:00|
* +----+-------------------+
*
* root
* |-- col1: integer (nullable = true)
* |-- col2: timestamp (nullable = true)
*/

val w = Window.partitionBy("col1")
df2.withColumn("col3",
  when(max("col2").over(w).cast("long") - col("col2").cast("long")=== 0, "Y")
)
  .show(false)

/**
  * +----+-------------------+----+
  * |col1|col2               |col3|
  * +----+-------------------+----+
  * |1   |2020-02-27 15:00:00|null|
  * |1   |2020-02-27 15:04:00|Y   |
  * +----+-------------------+----+
  */

### Sparksql

df2.createOrReplaceTempView("table")
spark.sql(
"""
| select col1, col2,
| case when (cast(max(col2) over (partition by col1) as long) - cast(col2 as long) = 0) then 'Y' end as col3
| from table
""".stripMargin)
.show(false)

/**
  * +----+-------------------+----+
  * |col1|col2               |col3|
  * +----+-------------------+----+
  * |1   |2020-02-27 15:00:00|null|
  * |1   |2020-02-27 15:04:00|Y   |
  * +----+-------------------+----+
  */

相关问题