Apache Spark groupby条件下的超前和滞后

7tofc5zh  于 2023-01-09  发布在  Apache
关注(0)|答案(2)|浏览(130)

对于信号数据集中的每个entity_id,查找具有最旧和最新month_iditem_id
在某些情况下,它可能是相同的项目。如果有2个不同的项目具有相同的month_id,然后采取较低的item_id的项目。
最后,将每个实体的信号计数相加,并输出为total_signals。正确的输出应为每个唯一的entity_id包含1行。

    • 输入:**
entity_id: long
item_id: integer
source: integer
month_id: integer
signal_count: integer
    • 输出:**
entity_id: long
oldest_item_id: integer
newest_item_id: integer
total_signals: integer

如何使用窗口函数超前和滞后特定的month_id列?
输入:

+---------+-------+------+------------+

|实体标识|项目标识|源|月份标识|信号计数|

+---------+-------+------+------------+

|小行星359781|小行星20001|小行星21000|小行星201705| 1个|

|小行星359781|小行星20001|小行星21000|小行星2017| 1个|

|小行星359781|三个|小行星21000|小行星201708|第二章|

|小行星359781|三个|小行星21000|小行星2017|第二章|

|小行星359781|三个|九七五|小行星2015|四个|

输出:

实体标识最旧项目标识最新项目标识总计信号

359781 3 3 23

152813413 1000 1000 2

224619015 0 3 12

使用scala api和spark Dataframe

vh0rcniy

vh0rcniy1#

在这种情况下,您不会使用lead()lag(),而是使用聚合。在这种情况下,您还需要一些条件聚合。
这可能是最好的方法:

select t.entity_id, t.num_signals,
       tmin.item_id as item_id_min_month,
       tmax.item_id as item_id_max_month
from (select t.entity_id, sum(signal_count) as num_signals,
             min(month_id) as min_month_id,
             max(month_id) as max_month_id
      from t
     ) t join
     t tmin
     on tmin.entity_id = t.entity_id and
        tmin.month_id = t.min_month_id join
     t tmax
     on tmax.entity_id = t.entity_id and
        tmax.month_id = t.max_month_id ;

如果您的SparkSQL版本中提供了first_value()last_value(),您也可以使用它们。

pkln4tw6

pkln4tw62#

您需要使用滞后和超前执行以下代码更改

val windowSpecLag = Window.partitionBy("entity_id").orderBy($"month_id".desc_nulls_first, $"item_id".asc_nulls_first)
val windowSpecLead = Window.partitionBy("entity_id").orderBy($"month_id".desc_nulls_first, $"item_id".desc_nulls_first)
val windowSpecSum = Window.partitionBy("entity_id")

val outputResultDf = fileContentAsDf
.withColumn("total_signals", sum(col("signal_count")).over(windowSpecSum))
.withColumn("lag", lag($"item_id", 1).over(windowSpecLag))
.withColumn("lead", lead($"item_id", 1).over(windowSpecLead))
.filter($"lag".isNull || $"lead".isNull)
.select("entity_id", "lag", "lead", "total_signals", "item_id")
.groupBy("entity_id")
.agg(last("item_id", ignoreNulls = true).alias("oldest_item_id"),
  first("item_id", ignoreNulls = true).alias("newest_item_id"),
  first("total_signals").alias("total_signals"))

相关问题