Scala Dataframe 操作的性能改进

w41d8nur  于 2022-11-23  发布在  Scala
关注(0)|答案(2)|浏览(191)

我正在使用一个按load_date列分区的表,并且每周使用delta optimize命令对其进行优化,作为我的用例的源数据集。
表架构如下所示:

+-----------------+--------------------+------------+---------+--------+---------------+
|               ID|          readout_id|readout_date|load_date|item_txt| item_value_txt|
+-----------------+--------------------+------------+---------+--------+---------------+

稍后将对该表的item_txtitem_value_txt列进行透视,并使用多个窗口函数应用许多操作,如下所示:

val windowSpec = Window.partitionBy("id","readout_date")
val windowSpec1 = Window.partitionBy("id","readout_date").orderBy(col("readout_id") desc)
val windowSpec2 = Window.partitionBy("id").orderBy("readout_date")
val windowSpec3 = Window.partitionBy("id").orderBy("readout_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
val windowSpec4 = Window.partitionBy("id").orderBy("readout_date").rowsBetween(Window.unboundedPreceding, Window.currentRow-1)

这些窗口函数用于实现对数据的多重逻辑处理,甚至有很少的连接用于处理数据。
最终表使用readout_dateid进行分区,可以看出性能非常差,因为100id和100readout_date需要花费大量时间
如果我没有对最终表进行分区,我会得到下面的错误。

Job aborted due to stage failure: Total size of serialized results of 129 tasks (4.0 GiB) is bigger than spark.driver.maxResultSize 4.0 GiB.

预计生产中的id数量将达到数十亿,我预计在处理完整数据时会出现更多的限制和性能问题。
下面提供了群集配置和利用率指标。x1c 0d1x

请让我知道,如果有什么是错误的,而做重新分区,任何方法,以提高集群利用率,以提高性能...
任何线索谢谢!

643ylb08

643ylb081#

maxResultSize只是一个你可以增加的设置。但是它被设置为4Gigs是为了警告你你正在做 * 坏事 *,你应该优化你的工作。你正在做正确的事情,寻求帮助来优化。
如果你关心性能,我建议你首先去掉窗口。你使用的前3个窗口可以用Groupby实现,这样性能会更好。最后两个窗口肯定很难用Groupby来重新构造,但是通过对问题的重新组织,你也许能够做到。诀窍是使用多个查询而不是一个。你可能认为这样做会更糟,但是我认为这是一个好办法。我在这里告诉你,如果你能避免使用窗口,你几乎每次都能获得更好的性能。窗口不是坏事,它们是要使用的工具,但是它们在无界数据上不能很好地执行。(您是否可以做一些中间步骤来减少窗口需要检查的数据?)或者您是否可以使用聚合函数来完成工作而不必使用窗口?你应该探索你的选择。

mzaanser

mzaanser2#

根据您的其他答案,您应该按ID分组,而不是按ID窗口化。并且可能按年/月的周使用聚合(总和)。这可能会在损失一些粒度的情况下提供非常快速的性能。这将使您有足够的洞察力来决定是否深入研究。
如果您希望更准确,我建议使用:正在将空值转换为0。

val windowSpec1 = Window.partitionBy("id").orderBy(col("readout_date") asc) // asc is important as it flips the relationship so that it groups the previous nulls

然后创建一个SIG_XX VAL或任何你想查看的信号的运行总数。将新列称为'null-partitions'。
这将有效地允许您对数字进行分组(通过空分区),然后您可以使用group by来运行聚合函数以完成计算。Window和group by可以做同样的事情,Windows只是在移动数据方面花费更多的成本,从而降低了速度。Group by使用更多的簇来完成工作,并加快了过程。

相关问题