给定一个Dataframe,我试图计算在过去30天内我看到一个emailid的次数。我的函数的主要逻辑如下:
val new_df = df
.withColumn("transaction_timestamp", unix_timestamp($"timestamp").cast(LongType))
val winSpec = Window
.partitionBy("email")
.orderBy(col("transaction_timestamp"))
.rangeBetween(-NumberOfSecondsIn30Days, Window.currentRow)
val resultDF = new_df
.filter(col("condition"))
.withColumn("count", count(col("email")).over(winSpec))
配置:
spark.executor.cores=5
因此,我可以看到5个阶段中有窗口功能,其中一些阶段完成得非常快(几秒钟内),还有2个甚至没有在3小时内完成,被困在最后几个任务中(进展非常缓慢):
这是一个数据倾斜的问题,如果我删除所有包含5个最高频率的行 email
从数据集中,作业很快完成(不到5分钟)。
如果尝试在window partitionby中使用其他键,作业将在几分钟内完成:
Window.partitionBy("email", "date")
但很明显,如果我这样做,它会执行错误的计数计算,这不是一个可接受的解决方案。
我尝试过其他各种Spark设置抛出更多的内存,核心,并行等,而这些似乎都没有帮助。
spark版本:2.2
当前spark配置:
-执行器内存:100g
-执行器核心:5
-驱动器内存:80g
-spark.executor.memory=100克
使用16核128GB内存的机器。最多500个节点。
解决这个问题的正确方法是什么?
更新:为了提供更多上下文,这里是原始Dataframe和相应的计算Dataframe:
val df = Seq(
("a@gmail.com", "2019-10-01 00:04:00"),
("a@gmail.com", "2019-11-02 01:04:00"),
("a@gmail.com", "2019-11-22 02:04:00"),
("a@gmail.com", "2019-11-22 05:04:00"),
("a@gmail.com", "2019-12-02 03:04:00"),
("a@gmail.com", "2020-01-01 04:04:00"),
("a@gmail.com", "2020-03-11 05:04:00"),
("a@gmail.com", "2020-04-05 12:04:00"),
("b@gmail.com", "2020-05-03 03:04:00")
).toDF("email", "transaction_timestamp")
val expectedDF = Seq(
("a@gmail.com", "2019-10-01 00:04:00", 1),
("a@gmail.com", "2019-11-02 01:04:00", 1), // prev one falls outside of last 30 days win
("a@gmail.com", "2019-11-22 02:04:00", 2),
("a@gmail.com", "2019-11-22 05:04:00", 3),
("a@gmail.com", "2019-12-02 03:04:00", 3),
("a@gmail.com", "2020-01-01 04:04:00", 1),
("a@gmail.com", "2020-03-11 05:04:00", 1),
("a@gmail.com", "2020-04-05 12:04:00", 2),
("b@gmail.com", "2020-05-03 03:04:00", 1) // new email
).toDF("email", "transaction_timestamp", count")
3条答案
按热度按时间dwthyt8l1#
你是对的,这是一个数据倾斜的问题,减少窗口大小将有很大帮助。要想获得过去30天的信息,你不需要等到时代的开始。同样,如果构建一个带有时间索引的窗口,那么在每个窗口的开始处的计算将是错误的,因为它将无法访问上一个窗口。
我建议构建一个索引,每30天递增一次,两个重叠窗口大小为60天,如下图所示:
为了理解这是如何工作的,让我们考虑如图所示的一个数据点
index=2
. 如果您有一个30天大小的窗口,它将需要访问其窗口内和前一个窗口内的数据。那是不可能的。这就是为什么我们建立更大的窗口,以便我们可以访问所有的数据。如果我们考虑win1
,我们的问题与30天大小的索引相同。如果我们考虑win2
但是,索引1的窗口中提供了所有数据。对于索引为3的点,我们将使用
win1
. 对于索引为4的点,win2
基本上,对于偶数指数,我们使用win2
. 对于奇数索引,我们使用win1
. 这种方法将大大减少最大分区大小,从而减少单个任务中处理的最大数据量。代码只是上面解释的翻译:
uqdfh47h2#
你的一些分区可能太大,这是由于事实上,有些电子邮件,有太多的数据在一个月。
要解决这个问题,您可以创建一个新的Dataframe,其中只包含电子邮件和时间戳。然后,通过电子邮件和时间戳分组,计算行数,并计算窗口中的数据,希望能少得多。如果时间戳趋向于重复,即
df.count
远大于df.select("email", "timestamp").distinct.count
. 如果不是这样,您可以截断时间戳,但代价是丢失一些精度。这样,您就不用计算过去30天内发生的次数(因为时间戳是以秒为单位的,所以只需一秒钟),而是根据需要计算发生的次数(一分钟、一小时甚至一天)。你会损失一点精度,但会大大加快计算速度。精度越高,速度就越快。代码如下所示:
6pp0gazn3#
我们还是可以避开Windows的
对于上述df
说明:
根据“email”制作成对的时间戳(join on email)
比较每一对并检查它是否在过去30天内:如果是,则标记为1或0
根据“电子邮件”和“交易时间戳”汇总计数
假设:(电子邮件,事务时间戳)是不同的。如果不是,我们可以通过添加单调递增ID来处理