Apache Spark 如何在AWS Glue作业中设置“zstd”压缩级别?

s4n0splo  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(170)

后台

“zstd”压缩编解码器有22个压缩级别。我读this Uber blog。关于压缩时间和文件大小,我用df.to_parquet与我们的数据进行了验证,并得到了相同的实验结果。所以我希望在我们的AWS Glue Spark作业中将压缩级别设置为19,该作业还将数据写入Delta Lake。

实验一

我的AWS Glue作业使用的是“Glue 4.0 - Spark 3.3,Scala 2,Python 3”版本。
这是我的代码

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://my-bucket/data/raw-parquet/motor/"
        ],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

additional_options = {
    "path": "s3://my-bucket/data/delta-tables/motor/",
    "mergeSchema": "true",
}
sink_to_delta_lake_node3_df = S3bucket_node1.toDF()
sink_to_delta_lake_node3_df.write.format("delta").options(**additional_options).mode(
    "overwrite"
).save()

job.commit()

基于https://stackoverflow.com/a/74276832/2000548,我可能会使用--conf parquet.compression.codec.zstd.level=19。(注意,写答案的作者说它似乎不工作。另一方面,Uber在博客中使它工作,所以我想可能有一种方法可以在Spark中正确设置“zstd”压缩级别)
下面是我的--conf

--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19

我在我的胶水作业中通过“作业详细信息->高级属性->作业参数”添加了这些配置:

  • 关键字:--conf
  • 值:spark.sql.parquet.compression.codec=zstd --conf parquet.compression.codec.zstd.level=19

(This是current way,用于在AWS Glue作业中设置多个--conf,我已经验证了该作业之前的预期工作)
我与压缩级别3进行了比较。然而,压缩级别19和3在Delta表中生成了完全相同的parquet文件大小97 MB(97,002,126字节)。
为了确保不同的“zstd”压缩级别对这些数据有不同的大小,我尝试了Python代码:

df.to_parquet(
  local_parquet_path,
  engine="pyarrow",
  compression="zstd",
  compression_level=19
)

使用压缩级别19的文件大小是使用压缩级别3的文件大小的92%,这意味着对于这些数据,当压缩级别非常不同时,文件大小会有差异。所以我觉得Spark中的--conf parquet.compression.codec.zstd.level=19没有像预期的那样工作。
如何在AWS Glue作业中设置“zstd”压缩级别?谢谢!

实验2 ~ 4(2023/9/29)

尝试这些组合:

--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
--conf spark.sql.parquet.compression.codec=zstd
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19
--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19

仍然得到完全相同的“zstd”Parquet文件大小在三角洲湖相比,没有设置任何压缩级别或设置为3。

实验五(9/29/2023)

如果只使用

--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19

这实际上使最终的文件成为像part-00000-1c8c7408-b14f-4ba1-9030-ecc437a2f8d3-c000.snappy.parquet这样的Snappy格式。这意味着spark.io.compression.codec=zstd不能按预期工作。

fjnneemd

fjnneemd1#

spark.io.compression.codec不用于写入生成的parquet文件-它用于写入压缩的内部数据,如RDD分区,事件日志,广播变量和shuffle输出(查看文档)。
从链接的Jira中的讨论来看,这在Spark中似乎是不可配置的(但也许我需要检查源代码)。

相关问题