对于信号数据集中的每个entity_id
,查找具有最旧和最新month_id
的item_id
。
在某些情况下,它可能是相同的项目。如果有2个不同的项目具有相同的month_id
,然后采取较低的item_id
的项目。
最后,将每个实体的信号计数相加,并输出为total_signals
。正确的输出应为每个唯一的entity_id
包含1行。
- 输入:**
entity_id: long
item_id: integer
source: integer
month_id: integer
signal_count: integer
- 输出:**
entity_id: long
oldest_item_id: integer
newest_item_id: integer
total_signals: integer
如何使用窗口函数超前和滞后特定的month_id
列?
输入:
+---------+-------+------+------------+
|实体标识|项目标识|源|月份标识|信号计数|
+---------+-------+------+------------+
|小行星359781|小行星20001|小行星21000|小行星201705| 1个|
|小行星359781|小行星20001|小行星21000|小行星2017| 1个|
|小行星359781|三个|小行星21000|小行星201708|第二章|
|小行星359781|三个|小行星21000|小行星2017|第二章|
|小行星359781|三个|九七五|小行星2015|四个|
输出:
实体标识最旧项目标识最新项目标识总计信号
359781 3 3 23
152813413 1000 1000 2
224619015 0 3 12
使用scala api和spark Dataframe
2条答案
按热度按时间vh0rcniy1#
在这种情况下,您不会使用
lead()
和lag()
,而是使用聚合。在这种情况下,您还需要一些条件聚合。这可能是最好的方法:
如果您的SparkSQL版本中提供了
first_value()
和last_value()
,您也可以使用它们。pkln4tw62#
您需要使用滞后和超前执行以下代码更改