如何更快地计算foundry“最新版本”数据集?

fjaof16o  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(474)

我有一个数据集接收对数据行的最新编辑,但它只接收最近编辑的版本(i、 它是一个增量的 update_ts 时间戳列)。
原始表格:

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |

表更新时:

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |
| key_1       | 1         |
| key_2       | 1         |
| key_1       | 2         |

摄取之后,我需要计算所有先前更新的“最新版本”,同时还要考虑任何新编辑。
这意味着我每次都进行增量摄取并运行快照输出。对于我的构建来说,这是非常慢的,因为我已经注意到,每当我想要计算数据的最新版本时,我必须查看所有的输出行。
事务n=1(快照):

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 0         |
| key_2       | 0         |
| key_3       | 0         |

事务n=2(追加):

| primary_key | update_ts |
|-------------|-----------|
| key_1       | 1         |
| key_2       | 1         |

如何使这个“最新版本”的计算速度更快?

mkshixfv

mkshixfv1#

这是一个常见的模式,将受益于扣。
其要点是:将输出快照基于 primary_key 列,在该列中,将完全跳过洗牌大得多的输出的昂贵步骤。
这意味着您只需将新数据交换到已包含以前历史的存储桶。
让我们从初始状态开始,在该状态下,我们在先前计算的“最新”版本上运行,该版本是一个缓慢的快照:

- output: raw_dataset
  input: external_jdbc_system
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - APPEND
    - APPEND
- output: clean_dataset
  input: raw_dataset
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - SNAPSHOT
    - SNAPSHOT

如果我们写出来 clean_dataset 在地板上使用扣环 primary_key 列转换为单独计算的桶数,以符合我们预期的数据规模,我们需要以下代码:

from transforms.api import transform, Input, Output
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

@transform(
    my_output=Output("/datasets/clean_dataset"),
    my_input=Input("/datasets/raw_dataset")
)
def my_compute_function(my_input, my_output):

    BUCKET_COUNT = 600
    PRIMARY_KEY = "primary_key"
    ORDER_COL = "update_ts"

    updated_keys = my_input.dataframe("added")
    last_written = my_output.dataframe("current")

    updated_keys.repartition(BUCKET_COUNT, PRIMARY_KEY)

    value_cols = [x for x in last_written.columns if x != PRIMARY_KEY]

    updated_keys = updated_keys.select(
      PRIMARY_KEY,
      *[F.col(x).alias("updated_keys_" + x) for x in value_cols]
    )

    last_written = last_written.select(
      PRIMARY_KEY,
      *[F.col(x).alias("last_written_" + x) for x in value_cols]
    )

    all_rows = updated_keys.join(last_written, PRIMARY_KEY, "fullouter")

    latest_df = all_rows.select(
      PRIMARY_KEY,
      *[F.coalesce(
          F.col("updated_keys_" + x),
          F.col("last_written_" + x)
        ).alias(x) for x in value_cols]
    )

    my_output.set_mode("replace")

    return my_output.write_dataframe(
        latest_df,
        bucket_cols=PRIMARY_KEY,
        bucket_count=BUCKET_COUNT,
        sort_by=ORDER_COL
    )

运行此命令时,您将注意到在查询计划中,项目在输出上的步骤不再包括交换,这意味着它将不会洗牌该数据。您现在看到的唯一交换是输入,它需要以与格式化输出完全相同的方式分发更改(这是一个非常快速的操作)。
这种交换随后被保存到 fullouter join步骤,然后join将利用这个漏洞,非常快地运行600个任务。最后,我们通过在相同的列上显式地bucketing到相同数量的bucket来维护输出的格式。
注意:使用这种方法,每个bucket中的文件大小会随着时间的推移而增长,而不会考虑增加bucket计数以保持大小合适的需要。使用这种技术,您最终将达到一个阈值,即文件大小超过128mb,并且您不再有效地执行(解决方法是使 BUCKET_COUNT 值)。
您的输出现在如下所示:

- output: raw_dataset
  input: external_jdbc_system
  hive_partitioning: none
  bucketing: none
  transactions:
    - SNAPSHOT
    - APPEND
    - APPEND
- output: clean_dataset
  input: raw_dataset
  hive_partitioning: none
  bucketing: BUCKET_COUNT by PRIMARY_KEY
  transactions:
    - SNAPSHOT
    - SNAPSHOT
    - SNAPSHOT

相关问题