我尝试使用下推 predicate 读取数据集的子集。我的输入数据集由存储在s3上的1、2tb和436个Parquet文件组成。使用push-down predicate ,我应该读取1/4的数据。
看到spark ui。我看到这个作业实际上读取了1/4的数据(300gb),但是在作业的第一阶段仍然有43436个分区,但是这些分区中只有1/4有数据,其他3/4是空的(请检查所附屏幕截图中的中间输入数据)。
我希望spark只为非空分区创建分区。我发现,使用pushdown predicate 读取整个数据集时,与直接由另一个作业(数据的1/4)读取预过滤的数据集相比,性能开销要高出20%。我怀疑这种开销是由于在我的第一个阶段中有大量的空分区/任务造成的,因此我有两个问题:
有什么解决方法可以避免这些空分区吗?
你认为这项开销还有其他原因吗?可能是下推过滤器执行自然有点慢?
先谢谢你
3条答案
按热度按时间gg58donl1#
空分区:似乎spark(2.4.5)试图真正拥有大小相同的分区≈ spark.sql.files.maxpartitionbytes(默认128mb)将多个文件打包到一个分区中,源代码在这里。但是,它在运行作业之前就完成了这项工作,因此它不知道在应用下推 predicate 之后,有3/4的文件不会输出数据。对于只放置行将被过滤掉的文件的分区,我最终得到了空分区。这也解释了为什么我的最大分区大小是44mb而不是128mb,因为没有一个分区碰巧有通过所有下推过滤器的文件。
20%的开销:最后,这不是由于空分区,我通过将spark.sql.files.maxpartitionbytes设置为1gb,成功地减少了空分区,但并没有提高读取效率。我认为开销是由于打开许多文件并读取它们的元数据造成的。spark估计打开一个文件相当于读取4mb spark.sql.files.opencostinbytes。因此,打开许多文件,即使由于过滤器无法读取,也不应忽略不计。。
pjngdqdw2#
似乎你的文件很小:1.2tb/436≈ 30兆。所以你可以考虑增加
spark.sql.files.maxPartitionBytes
,以查看它是否减少了分区的总数。我对s3没有太多的经验,所以不确定它是否有用,因为在它的描述中有这样一个注解:读取文件时要打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如parquet、json和orc)时有效。
ukxgm1gy3#
使用s3select,您只能检索数据的一个子集。
对于AmazonEMR 5.17.0及更高版本,您可以在AmazonEMR上使用S3select和spark。s3select允许应用程序仅从对象中检索数据的子集。
否则,s3充当对象存储,在这种情况下,必须读取整个对象。在您的情况下,您必须读取所有文件中的所有内容,并在客户端对其进行过滤。
实际上还有一个非常类似的问题,通过测试你可以看到:
输入大小始终与处理所有数据的spark作业相同
您还可以看到关于优化从Parquet文件的s3读取的数据的问题。