spark作业,带有gzip格式的大文本文件

sirbozc5  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(434)

我正在运行spark作业,处理输入文件花费的时间太长。输入文件是6.8gb的gzip格式,包含110m的文本行。我知道它是gzip格式的,所以它是不可拆分的,只有一个执行器将用于读取该文件。
作为调试过程的一部分,我决定看看将gzip文件转换为parquet需要多长时间。我的想法是,一旦我转换成parquet文件,然后如果我在这个文件上运行我原来的spark作业,在这种情况下,它将使用多个执行器,输入文件将被并行处理。
但即使是小工作也比我预期的要花很长时间。这是我的密码:

val input = sqlContext.read.text("input.gz")
input.write.parquet("s3n://temp-output/")

当我在笔记本电脑(16gbram)中提取这个文件时,只花了不到2分钟的时间。当我在spark cluster上运行它时,我的期望是,由于我使用的executor内存是58GB,所以它将花费相同甚至更少的时间。花了大约20分钟。
我错过了什么?我很抱歉,如果这听起来相当业余,但我是相当新的Spark。
在gzip文件上运行spark作业的最佳方法是什么?假设我没有以其他文件格式(bzip2、snappy、lzo)创建该文件的选项。

szqfcxe2

szqfcxe21#

在执行输入-处理-输出类型的spark作业时,需要考虑三个独立的问题:
输入并行性
处理并行性
输出并行性
在您的例子中,输入并行度是1,因为在您的问题中,您声明不能更改输入格式或粒度。
你也基本上没有做任何处理,所以你不能得到任何收益。
但是,您可以控制输出并行性,这将给您带来两个好处:
多个CPU将写入,从而减少写入操作的总时间。
您的输出将被拆分为多个文件,以便在以后的处理中利用输入并行性。
为了增加并行性,必须增加分区的数量,这可以通过 repartition() ,例如。,

val numPartitions = ...
input.repartition(numPartitions).write.parquet("s3n://temp-output/")

当选择最佳分区数时,有许多不同的因素需要考虑。
数据大小
分区倾斜
群集ram大小
群集中的核心数
后续处理的类型
用于后续处理的集群(ram和内核)的大小
正在写入的系统
如果不知道你的目标和限制,很难提出一个可靠的建议,但这里有几个通用的指导方针:
因为你的分区不会倾斜(上面的 repartition 如果将分区数设置为executor核心数(假设您使用的节点具有足够的i/o),那么您将获得最快的吞吐量。
当您处理数据时,您真的希望整个分区能够“适合”分配给单个executor核心的ram。这里“fit”的意思取决于您的处理过程。如果你正在做一个简单的 map 转换时,数据可以流式传输。如果您正在做一些涉及订购的事情,那么ram需要大幅增长。如果您使用的是spark 1.6+,您将获得更灵活的内存管理的好处。如果你使用的是较早的版本,你就必须更加小心。当spark必须开始对磁盘进行“缓冲”时,作业执行就会停止。磁盘大小和内存大小可能非常非常不同。后者根据您处理数据的方式以及spark可以从 predicate 下推(parquet支持这一点)中获得多少好处而有所不同。使用spark ui查看各个作业阶段需要多少ram。
顺便说一句,除非您的数据具有非常特定的结构,否则不要硬编码分区号,因为这样您的代码将在不同大小的集群上以次优方式运行。相反,使用下面的技巧来确定集群中执行器的数量。然后可以根据所使用的机器乘以每个执行器的内核数。

// -1 is for the driver node
val numExecutors = sparkContext.getExecutorStorageStatus.length - 1

作为一个参考点,在我们的团队中,我们使用相当复杂的数据结构,这意味着ram大小>>磁盘大小,我们的目标是将s3对象保持在50-250mb范围内,以便在每个executor核心都有10-20gbram的节点上进行处理。
希望这有帮助。

相关问题