scala 在Spark中导入parquet文件时出现内存问题

8ftvxx2r  于 12个月前  发布在  Scala
关注(0)|答案(2)|浏览(170)

我尝试在Scala Spark(1.5)中查询parquet文件中的数据,包括一个200万行的查询(以下代码中的“变体”)。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")

val parquetFile = sqlContext.read.parquet(<path>)

parquetFile.registerTempTable("tmpTable")
sqlContext.cacheTable("tmpTable")

val patients = sqlContext.sql("SELECT DISTINCT patient FROM tmpTable ...)

val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... )

字符串
当获取的行数较低时,此操作运行正常,但当请求大量数据时,此操作会失败,并出现“Size exceeds 10.MAX_VALUE”错误。错误如下:

User class threw exception: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 43 in stage 1.0 failed 4 times,
most recent failure: Lost task 43.3 in stage 1.0 (TID 123, node009):
java.lang.RuntimeException: java.lang.IllegalArgumentException:
Size exceeds Integer.MAX_VALUE at
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at ...


我能做些什么来使这工作吗?
这看起来像是一个内存问题,但是我尝试使用了多达100个执行程序,没有任何区别(无论涉及多少执行程序,失败所需的时间也是一样的)。
我试图通过简单地替换这行代码来强制更高的并行化,但无济于事:

val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ).repartition(sc.defaultParallelism*10)

k97glaaz

k97glaaz1#

我不认为这个问题是特定于parquet的。你正在“击中”Spark中分区最大大小的限制。
在sun.nio.ch.FileChannelImpl.map(FileScriptelImpl.java:828)上的大小超过了最大值。
MAX_VALUE检测到你有一个大小(我相信)超过2GB的分区(需要超过一个int32来索引它)。
Joe Widen的评论是正确的。你需要更多地重新分区你的数据。尝试1000或更多。
例如,在一个示例中,

val data = sqlContext.read.parquet("data.parquet").rdd.repartition(1000).toDF

字符串

jobtbby3

jobtbby32#

你也可以尝试使用下面的spark配置来限制分区大小:

spark.sql.files.maxPartitionBytes = <Size in bytes> (Default 128 MB)

字符串
根据https://spark.apache.org/docs/latest/sql-performance-tuning.html
当阅读文件时,打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。
这确保了在任何给定的时间节点的每个核心只在指定的卷上工作。Spark将根据此值创建适当数量的分区,以增加程序的并行度。将此值设置为0.25 * memory per core是一个很好的经验法则
举例来说:

If your node has 16 cores, 32 GB mem. 
mem_per_core = 32/16 = ~2 GB (ignoring the overhead)
max_partition_mb = 0.25 * (2 * 1024) = 512 MBs

相关问题