我有一系列约30个数据集,所有这些数据集都需要连接在一起,以形成一个广泛的最终表。最后一个表需要5年的时间(每年一个表)并将它们合并在一起,然后将此完整历史与其他表的完整历史(类似地合并)合并,形成一个大的、历史的、宽表。
这些每年第一次的表格的布局如下:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
其他年份表如下:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 1 |
| key_2 | 1 |
然后将它们联合起来创建:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
类似地,第二种类型的表在联合时会产生以下结果:
table_type_2:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
我现在想加入 table_type_1
与 table_type_2
在 primary_key
以及 year
让出一张更宽的table。我注意到这个最后的连接需要很长的时间,并且会洗牌很多数据。
我怎样才能让这更快?
2条答案
按热度按时间e0uiprwp1#
我给你的建议是:在小数据集上建立第一个联合,然后广播数据集,第一个联合的结果,spark将把数据集部署在它的不同节点上,这将减少随机数。spark上的union经过了很好的优化,因此您需要做的是考虑以下问题:只从一开始选择您需要的列,避免在union之前执行任何类型的非成本有效的操作,如groupbykey…等等,因为spark在生成最终进程时会调用这些操作。我建议您避免使用hive,因为它使用map reduce策略,与spark sql相比,这是不值得的。您可以使用此函数示例只需更改键,如果可以,请使用scala它将直接与spark交互:
wtlkbnrh2#
你可以在每年的表上使用扣环
primary_key
以及year
将列放入完全相同数量的bucket中,以避免在计算最终联接时进行昂贵的交换。注意:当您选择
BUCKET_COUNT
价值观,重要的是要明白它应该为最终的目标而优化all_tables
输出,而不是中间表。这意味着您可能会得到对于中间表来说非常小的文件。这可能是无关紧要的相比,效率的提高all_tables
输出,因为在连接所有东西时,不必计算大量的交换;你的桶将预先计算,你可以简单地SortMergeJoin
在输入文件上。对于一个如何编写转换并写出指定数量的bucket的显式示例,我在这里的答案可能很有用。