在spark中创建行组大小小于100的Parquet文件

aoyhnmkz  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(399)

我有一个SparkDataframe有少量的领域。一些字段是巨大的二进制斑点。整行的大小约为50MB。
我正在将Dataframe保存为Parquet格式。我正在使用控制行组的大小 parquet.block.size 参数。
spark将生成一个Parquet文件,但是我总是在一个行组中得到至少100行。这对我来说是个问题,因为块大小可能会变成千兆字节,这在我的应用程序中不起作用。 parquet.block.size 只要大小足够容纳100多行,就可以正常工作。
我将internalparquetrecordwriter.java修改为 MINIMUM_RECORD_COUNT_FOR_CHECK = 2 ,解决了这个问题,但是,我找不到支持调优这个硬编码常量的配置值。
有没有其他/更好的方法来获得小于100的行组大小?
这是我的代码片段:

from pyspark import Row
from pyspark.sql import SparkSession
import numpy as np

from pyspark.sql.types import StructType, StructField, BinaryType

def fake_row(x):
    result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
    return Row(result, result)

spark_session = SparkSession \
    .builder \
    .appName("bbox2d_dataset_extraction") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "4g")

spark_session.master('local[5]')

spark = spark_session.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)

index = sc.parallelize(range(50), 5)
huge_rows = index.map(fake_row)
schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])

bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
bbox2d_dataframe. \
    write.option("compression", "none"). \
    mode('overwrite'). \
    parquet('/tmp/huge/')
5sxhfpxr

5sxhfpxr1#

不幸的是,我还没有找到这样做的方法。我报告此问题是为了删除硬编码值并使其可配置。如果你感兴趣的话,我有一个补丁。

j13ufse2

j13ufse22#

虽然parquet-409还没有修复,但是有几个解决方法可以让应用程序使用它 100 硬编码的每个行组的最小记录数。
第一个问题和解决方法:您提到您的行可能大到50mb。这使得行组大小约为5gb。同时你的spark执行器只有4gb( spark.executor.memory ). 使其明显大于最大行组大小。
我推荐使用12-20gb的大Spark执行器内存 spark.executor.memory . 玩这个,看看哪一个适用于您的数据集。在这个范围内,我们的大多数生产作业都使用spark executor内存。要使其适用于如此大的行组,您可能还需要进行调整 spark.executor.cores 设置为1,以确保每个执行器进程一次只占用一个这样大的行组(以失去一些Spark效率为代价)或许可以试试 spark.executor.cores 设置为2-这可能需要增加 spark.executor.memory 至20-31gb范围(尽量保持在32gb以下,因为jvm切换到非压缩oop,这可能会导致50%的内存开销)
第二个问题和解决方法:5gb的大行块很可能分布在许多hdfs块上,因为默认hdfs块的范围是128-256mb(我假设您使用hdfs来存储这些Parquet文件,因为您有“hadoop”标记)Parquet最佳实践是让行组完全驻留在一个hdfs块中:
行组大小:较大的行组允许较大的列块,这使得执行较大的顺序io成为可能。更大的组也需要在写路径中有更多的缓冲(或两次写入)。我们建议使用大型行组(512mb-1gb)。因为可能需要读取整个行组,所以我们希望它完全适合一个hdfs块。因此,hdfs块大小也应该设置为更大。优化的读取设置是:1gb行组、1gb hdfs块大小、每个hdfs文件1个hdfs块。
下面是如何更改hdfs块大小(在创建此类Parquet文件之前设置)的示例:

sc._jsc.hadoopConfiguration().set("dfs.block.size", "5g")

或者在spark scala中:

sc.hadoopConfiguration.set("dfs.block.size", "5g")

我希望这将是固定在Parquet地板水平有时,但这两个变通办法应该允许您操作与Parquet地板这样大的行组。

相关问题