我有一个SparkDataframe有少量的领域。一些字段是巨大的二进制斑点。整行的大小约为50MB。
我正在将Dataframe保存为Parquet格式。我正在使用控制行组的大小 parquet.block.size
参数。
spark将生成一个Parquet文件,但是我总是在一个行组中得到至少100行。这对我来说是个问题,因为块大小可能会变成千兆字节,这在我的应用程序中不起作用。 parquet.block.size
只要大小足够容纳100多行,就可以正常工作。
我将internalparquetrecordwriter.java修改为 MINIMUM_RECORD_COUNT_FOR_CHECK = 2
,解决了这个问题,但是,我找不到支持调优这个硬编码常量的配置值。
有没有其他/更好的方法来获得小于100的行组大小?
这是我的代码片段:
from pyspark import Row
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import StructType, StructField, BinaryType
def fake_row(x):
result = bytearray(np.random.randint(0, 127, (3 * 1024 * 1024 / 2), dtype=np.uint8).tobytes())
return Row(result, result)
spark_session = SparkSession \
.builder \
.appName("bbox2d_dataset_extraction") \
.config("spark.driver.memory", "12g") \
.config("spark.executor.memory", "4g")
spark_session.master('local[5]')
spark = spark_session.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 8 * 1024 * 1024)
index = sc.parallelize(range(50), 5)
huge_rows = index.map(fake_row)
schema = StructType([StructField('f1', BinaryType(), False), StructField('f2', BinaryType(), False)])
bbox2d_dataframe = spark.createDataFrame(huge_rows, schema).coalesce(1)
bbox2d_dataframe. \
write.option("compression", "none"). \
mode('overwrite'). \
parquet('/tmp/huge/')
2条答案
按热度按时间5sxhfpxr1#
不幸的是,我还没有找到这样做的方法。我报告此问题是为了删除硬编码值并使其可配置。如果你感兴趣的话,我有一个补丁。
j13ufse22#
虽然parquet-409还没有修复,但是有几个解决方法可以让应用程序使用它
100
硬编码的每个行组的最小记录数。第一个问题和解决方法:您提到您的行可能大到50mb。这使得行组大小约为5gb。同时你的spark执行器只有4gb(
spark.executor.memory
). 使其明显大于最大行组大小。我推荐使用12-20gb的大Spark执行器内存
spark.executor.memory
. 玩这个,看看哪一个适用于您的数据集。在这个范围内,我们的大多数生产作业都使用spark executor内存。要使其适用于如此大的行组,您可能还需要进行调整spark.executor.cores
设置为1,以确保每个执行器进程一次只占用一个这样大的行组(以失去一些Spark效率为代价)或许可以试试spark.executor.cores
设置为2-这可能需要增加spark.executor.memory
至20-31gb范围(尽量保持在32gb以下,因为jvm切换到非压缩oop,这可能会导致50%的内存开销)第二个问题和解决方法:5gb的大行块很可能分布在许多hdfs块上,因为默认hdfs块的范围是128-256mb(我假设您使用hdfs来存储这些Parquet文件,因为您有“hadoop”标记)Parquet最佳实践是让行组完全驻留在一个hdfs块中:
行组大小:较大的行组允许较大的列块,这使得执行较大的顺序io成为可能。更大的组也需要在写路径中有更多的缓冲(或两次写入)。我们建议使用大型行组(512mb-1gb)。因为可能需要读取整个行组,所以我们希望它完全适合一个hdfs块。因此,hdfs块大小也应该设置为更大。优化的读取设置是:1gb行组、1gb hdfs块大小、每个hdfs文件1个hdfs块。
下面是如何更改hdfs块大小(在创建此类Parquet文件之前设置)的示例:
或者在spark scala中:
我希望这将是固定在Parquet地板水平有时,但这两个变通办法应该允许您操作与Parquet地板这样大的行组。