获取时间范围内的最新值或空值

yjghlzjz  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(440)

我有一个庞大的数据集。

| Date       | ID | Value |
+------------+----+-------+
| 10-10-2020 | 1  | 1     |
| 10-11-2020 | 1  | 2     |
| 10-12-2020 | 1  | 3     |
| 10-13-2020 | 1  | 4     |
| 10-10-2020 | 2  | 5     |
| 10-11-2020 | 2  | 6     |
| 10-12-2020 | 2  | 7     |
| 10-09-2020 | 3  | 8     |
| 10-08-2020 | 4  | 9     |

如您所见,这个示例包含4个不同日期范围内的id。
我有一个特殊的逻辑,它用 RangeBetween 功能。假设这是一个简单的 sum 在规定的时间范围内。
我需要做的是生成这样一个结果(解释如下):

| ID | Value sum (last 2 days) | Value sum (last 4 days) | Value sum (prev 2 days) | Value sum (prev 4 days) | Result (2 days) | Result (4 days) |
+----+-------------------------+-------------------------+-------------------------+-------------------------+-----------------+-----------------+
| 1  | 7 (3+4)                 | 10 (1+2+3+4)            | 5 (3+2)                 | 6 (3+2+1)               | 7               | 10              |
| 2  | 7                       | 18 (5+6+7)              | 11 (5+6)                | 11 (5+6)                | 7               | 18              |
| 3  | null                    | null                    | null                    | 8                       | null            | 0               |
//exclude | 4  | null          | null                    | null                    | null                    | null            | null            |

这个例子假设 today10-13-2020 . 对于每个id,我需要得到两个范围内的值的总和:2天和4天

1. the table contains 2 calculations for the same ranges starting from now and the day before (columns last and prev X days)
2. if all values exist in a range - simply result the sum of the range (example with ID = 1)
3. if some of values are not specified in a range assume it is zero (example with ID = 2)
4. if values do not exist in the defined range, but there is at least 1 value in the range with the day before - assume there was a sum yesterday, but no such today - set it to zero (example #3)
5. if no value values in the range and the day before - do not include in the result set (example #4)

现在我有一个密码:

let last2Days =
    Window
        .PartitionBy('ID')
        .OrderBy(Functions.Col('Date').Cast("timestamp").Cast("long"))
        .RangeBetween(-1, 0)

let prev2Days =
    Window
        .PartitionBy('ID')
        .OrderBy(Functions.Col('Date').Cast("timestamp").Cast("long"))
        .RangeBetween(-2, -1)

df
    .WithColumn('last2daysSum', Functions.Sum('value').Over(last2Days))
    .WithColumn('prev2daysSum', Functions.Sum('value').Over(last4Days))
    .WithColumn('result2Days', Functions.Col('last2daysSum'))
    .Where(Functions.Col('Date').EqualTo(Functions.Lit('10-13-2020')))

例如#1(从 last2daysSum )

1. is there a simple way to get a proper result for #2 (the latest record within defined time range)?
2. combine the previous question and condition `if last = null && prev != null then 0 else if last = null && prev = null then null else last` - example #3?
3. how to exclude records as per example #4?

不改组就可以解决吗?

7jmck4yq

7jmck4yq1#

对于问题1,如果您只想计算一个特定日期,那么 groupBy 以及 agg 更简单,执行速度更快。诀窍是使用 when 内部聚合函数,如 sum .
对于问题#2和#3,可以合并为零,并在此之前过滤掉完全为空的行。如果您需要筛选的范围比您想要显示的范围更广(因此包括以前有值但现在没有值的行),您可以为筛选后的较长时间段添加一个额外的计算。参见下面的代码示例。

import org.apache.spark.sql.functions._

val data = Seq(
  ("2020-10-10", 1, 1),
  ("2020-10-11", 1, 2),
  ("2020-10-12", 1, 3),
  ("2020-10-13", 1, 4),
  ("2020-10-10", 2, 5),
  ("2020-10-11", 2, 6),
  ("2020-10-12", 2, 7),
  ("2020-10-09", 3, 8),
  ("2020-10-08", 4, 9)
).toDF("Date", "ID", "Value").withColumn("Date", to_date($"Date"))

def sumLastNDays(now: java.sql.Timestamp, start: Int, end: Int = 0) = 
  sum(when($"Date".between(date_sub(lit(now), start-1), date_sub(lit(now), end)), $"Value"))

val now = java.sql.Timestamp.valueOf("2020-10-13 00:00:00")

data
  .groupBy($"ID")
  .agg(
    sumLastNDays(now, 2).as("last2DaysSum"),
    sumLastNDays(now, 4).as("last4DaysSum"),
    sumLastNDays(now, 4, 2).as("prev2DaysSum"),
    sumLastNDays(now, 5).as("last5DaysSum")
  )
  .filter($"last5DaysSum".isNotNull)
  .drop($"last5DaysSum")
  .withColumn("last4DaysSum", coalesce($"last4DaysSum", lit(0)))
  .withColumn("last2DaysSum", coalesce($"last2DaysSum", lit(0)))
  .withColumn("prev2DaysSum", coalesce($"prev2DaysSum", lit(0)))
  .orderBy($"ID")
  .show()

结果:

+---+------------+------------+------------+
| ID|last2DaysSum|last4DaysSum|prev2DaysSum|
+---+------------+------------+------------+
|  1|           7|          10|           3|
|  2|           7|          18|          11|
|  3|           0|           0|           0|
+---+------------+------------+------------+

注意:我不确定您的意思是prev2days是当前2天间隔之前的前2天间隔还是昨天的最后2天间隔,因为在预期结果表中,id 1对prev2days进行了10月11日至12日的求和,id 2对prev2days进行了10月10日至11日的求和,但是如果您需要其他内容,无论哪种方法都可以调整范围参数。我假设prev2days与last2days不重叠,只需将其更改为 sumLastNDays(now, 3, 1) 如果你想重叠两天的范围。

相关问题