如何对行\号()列应用转换

tsm1rwdh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(342)

需要应用转换,以便将分区组的created\u at字段的第一个值添加到整个分区组的新列startdate中。
第二,每当tg为“type”和“key”的相同值而改变时,在新列的字段中创建的列应成为其上具有相同“type”和“key”的行的结束日期,否则它将保持为空。

type             key         tg      created_at       timestamp       row_number

device_id    essentials    template   1600269347   2020-09-21 19:08:05      1                           
device_id    experiment      t1       1599721314   2020-09-17 01:37:17      1                                                    
device_id    experiment      v1       1600228007   2020-09-21 18:07:53      2
device_id    experiment      c1       1605221085   2020-09-21 18:07:53      3
test         t_key           t1       1599714939   2020-09-16 01:37:55      1
test         t_key           t2       1600084857   2020-09-21 17:08:23      2

迄今为止应用的步骤-:val windowspec=window.partitionby(“type”,“key”).orderby(“timestamp”)test.withcolumn(“row\u number”,row\u number.over(windowspec)).show()
预期输出-:

type        key         tg      created_at       timestamp     row_number startDate  endDate

device_id  essentials template 1600269347   2020-09-21 19:08:05  1        1600269347  null                
device_id  experiment   t1     1599721314   2020-09-17 01:37:17  1        1599721314  1600228007                                      
device_id  experiment   v1     1600228007   2020-09-21 18:07:53  2        1599721314  1605221085
device_id  experiment   c1     1605221085   2020-09-21 18:07:53  3        1599721314  null
test       t_key        t1     1599714939   2020-09-16 01:37:55  1        1599714939  1600084857
test       t_key        t2     1600084857   2020-09-21 17:08:23  2        1599714939  null

对如何进行有什么建议吗?

szqfcxe2

szqfcxe21#

你可以用 first 在窗口上获取创建的第一个值。 min 在这种情况下也可以。
第二个有点棘手。你需要使用 lag 请记住,对于窗口中的最后一行,滞后于窗口的结果将始终为空。

val schema =  List(
  StructField("type", StringType, true),
  StructField("key", StringType, true),
  StructField("tg", StringType, true),
  StructField("created_at", IntegerType, true),
  StructField("timestamp", TimestampType, true),
  StructField("row_number", IntegerType, true)
)

val data =  Seq(
    Row("device_id", "essentials", "template", 1600269347, Timestamp.valueOf("2020-09-21 19:08:05"), 1),
    Row("device_id", "experiment", "t1", 1599721314, Timestamp.valueOf("2020-09-17 01:37:17"), 1),  
    Row("device_id", "experiment", "v1", 1600228007, Timestamp.valueOf("2020-09-21 18:07:53"), 2),
    Row("device_id", "experiment", "c1", 1605221085, Timestamp.valueOf("2020-09-21 18:07:53"), 3),
    Row("test", "t_key", "t1", 1599714939, Timestamp.valueOf("2020-09-16 01:37:55"), 1),
    Row("test", "t_key", "t2", 1600084857, Timestamp.valueOf("2020-09-21 17:08:23"), 2)
  )

val test = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))

val windowSpec = Window.partitionBy("type","key").orderBy("timestamp")
test
.withColumn("startDate", first(col("created_at")).over(windowSpec))
.withColumn("endDate", when(
  lead(col("tg"), 1).over(windowSpec).isNotNull && 
  lead(col("tg"), 1).over(windowSpec) =!= col("tg"), 
  lead(col("created_at"), 1).over(windowSpec)
).otherwise(lit(null).cast(IntegerType)))
.show()
+---------+----------+--------+----------+-------------------+----------+----------+----------+
|     type|       key|      tg|created_at|          timestamp|row_number| startDate|   endDate|
+---------+----------+--------+----------+-------------------+----------+----------+----------+
|device_id|essentials|template|1600269347|2020-09-21 19:08:05|         1|1600269347|      null|
|device_id|experiment|      t1|1599721314|2020-09-17 01:37:17|         1|1599721314|1600228007|
|device_id|experiment|      v1|1600228007|2020-09-21 18:07:53|         2|1599721314|1605221085|
|device_id|experiment|      c1|1605221085|2020-09-21 18:07:53|         3|1599721314|      null|
|     test|     t_key|      t1|1599714939|2020-09-16 01:37:55|         1|1599714939|1600084857|
|     test|     t_key|      t2|1600084857|2020-09-21 17:08:23|         2|1599714939|      null|
+---------+----------+--------+----------+-------------------+----------+----------+----------+

相关问题