如何使我的多连接/多联合数据集计算更快?

mum43rcc  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(508)

我有一系列约30个数据集,所有这些数据集都需要连接在一起,以形成一个广泛的最终表。最后一个表需要5年的时间(每年一个表)并将它们合并在一起,然后将此完整历史与其他表的完整历史(类似地合并)合并,形成一个大的、历史的、宽表。
这些每年第一次的表格的布局如下:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |

其他年份表如下:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 1    |
| key_2       | 1    |

然后将它们联合起来创建:

table_type_1:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

类似地,第二种类型的表在联合时会产生以下结果:

table_type_2:

| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

我现在想加入 table_type_1table_type_2primary_key 以及 year 让出一张更宽的table。我注意到这个最后的连接需要很长的时间,并且会洗牌很多数据。
我怎样才能让这更快?

e0uiprwp

e0uiprwp1#

我给你的建议是:在小数据集上建立第一个联合,然后广播数据集,第一个联合的结果,spark将把数据集部署在它的不同节点上,这将减少随机数。spark上的union经过了很好的优化,因此您需要做的是考虑以下问题:只从一开始选择您需要的列,避免在union之前执行任何类型的非成本有效的操作,如groupbykey…等等,因为spark在生成最终进程时会调用这些操作。我建议您避免使用hive,因为它使用map reduce策略,与spark sql相比,这是不值得的。您可以使用此函数示例只需更改键,如果可以,请使用scala它将直接与spark交互:

def map_To_cells(df1: DataFrame, df2: DataFrame): DataFrame = {
val df0= df2.withColumn("key0",F.col("key")).drop("key")
df1.as("main").join(
broadcast(df0),
df0("key0") <=> df("key")
).select( needed columns)
}
wtlkbnrh

wtlkbnrh2#

你可以在每年的表上使用扣环 primary_key 以及 year 将列放入完全相同数量的bucket中,以避免在计算最终联接时进行昂贵的交换。

- output: table_type_1_year_0
  input: raw_table_type_1_year_0
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_1_year_1
  input: raw_table_type_1_year_1
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: table_type_2_year_0
  input: raw_table_type_2_year_0
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_2_year_1
  input: raw_table_type_2_year_1
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: all_tables
  input:
    - table_type_1_year_0
    - table_type_1_year_1
...
    - table_type_2_year_0
    - table_type_2_year_1
...
  hive_partitioning: none
  bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)

注意:当您选择 BUCKET_COUNT 价值观,重要的是要明白它应该为最终的目标而优化 all_tables 输出,而不是中间表。这意味着您可能会得到对于中间表来说非常小的文件。这可能是无关紧要的相比,效率的提高 all_tables 输出,因为在连接所有东西时,不必计算大量的交换;你的桶将预先计算,你可以简单地 SortMergeJoin 在输入文件上。
对于一个如何编写转换并写出指定数量的bucket的显式示例,我在这里的答案可能很有用。

相关问题