后台
“zstd”压缩编解码器有22个压缩级别。我读this Uber blog。关于压缩时间和文件大小,我用df.to_parquet
与我们的数据进行了验证,并得到了相同的实验结果。所以我希望在我们的AWS Glue Spark作业中将压缩级别设置为19,该作业还将数据写入Delta Lake。
实验一
我的AWS Glue作业使用的是“Glue 4.0 - Spark 3.3,Scala 2,Python 3”版本。
这是我的代码
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": [
"s3://my-bucket/data/raw-parquet/motor/"
],
"recurse": True,
},
transformation_ctx="S3bucket_node1",
)
additional_options = {
"path": "s3://my-bucket/data/delta-tables/motor/",
"mergeSchema": "true",
}
sink_to_delta_lake_node3_df = S3bucket_node1.toDF()
sink_to_delta_lake_node3_df.write.format("delta").options(**additional_options).mode(
"overwrite"
).save()
job.commit()
基于https://stackoverflow.com/a/74276832/2000548,我可能会使用--conf parquet.compression.codec.zstd.level=19
。(注意,写答案的作者说它似乎不工作。另一方面,Uber在博客中使它工作,所以我想可能有一种方法可以在Spark中正确设置“zstd”压缩级别)
下面是我的--conf
:
--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
我在我的胶水作业中通过“作业详细信息->高级属性->作业参数”添加了这些配置:
- 关键字:
--conf
- 值:
spark.sql.parquet.compression.codec=zstd --conf parquet.compression.codec.zstd.level=19
(This是current way,用于在AWS Glue作业中设置多个--conf
,我已经验证了该作业之前的预期工作)
我与压缩级别3进行了比较。然而,压缩级别19和3在Delta表中生成了完全相同的parquet文件大小97 MB(97,002,126字节)。
为了确保不同的“zstd”压缩级别对这些数据有不同的大小,我尝试了Python代码:
df.to_parquet(
local_parquet_path,
engine="pyarrow",
compression="zstd",
compression_level=19
)
使用压缩级别19的文件大小是使用压缩级别3的文件大小的92%,这意味着对于这些数据,当压缩级别非常不同时,文件大小会有差异。所以我觉得Spark中的--conf parquet.compression.codec.zstd.level=19
没有像预期的那样工作。
如何在AWS Glue作业中设置“zstd”压缩级别?谢谢!
实验2 ~ 4(2023/9/29)
尝试这些组合:
--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
--conf spark.sql.parquet.compression.codec=zstd
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19
--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19
仍然得到完全相同的“zstd”Parquet文件大小在三角洲湖相比,没有设置任何压缩级别或设置为3。
实验五(9/29/2023)
如果只使用
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19
这实际上使最终的文件成为像part-00000-1c8c7408-b14f-4ba1-9030-ecc437a2f8d3-c000.snappy.parquet
这样的Snappy格式。这意味着spark.io.compression.codec=zstd
不能按预期工作。
1条答案
按热度按时间fjnneemd1#
spark.io.compression.codec
不用于写入生成的parquet文件-它用于写入压缩的内部数据,如RDD分区,事件日志,广播变量和shuffle输出(查看文档)。从链接的Jira中的讨论来看,这在Spark中似乎是不可配置的(但也许我需要检查源代码)。