我正在使用 Spark (2.2.0)
与 ElasticSeach Hadoop (7.6.0)
spark作业的目的是处理Parquet文件中的记录,并通过unique将其附加到elasticsearch中已经存在的文档中。由于elasticsearch不支持更新,因此获取记录和更新记录的部分由作业处理。
我有大约 20 million records
在索引中。在任何时候,我都不需要所有的记录,因此我使用过滤器下推来获取所需数量的文档。
出于性能原因,您可以下推的最大记录项数为 65536
. 我长大了 100K
但没有进一步移动它,因为获取的平均记录数介于 2-3 million
.
所以我们的目标是创建Dataframe 100K
每个请求的记录,并使用 union
我的零件代码如下
val df = sparkSession
.sqlContext
.read
.format("es")
.load(index)
val CURSOR_SIZE = 100000
val cursors = filter._2.grouped(CURSOR_SIZE)
cursors.map(cursor => df.filter($"${filter._1}".isin(cursor:_*))).reduce(_ union _)
有了上面的代码,spark用户界面就陷入了没有任务启动的困境 collect()
函数完成,直到我得到一个oom错误。
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1883)
at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1792)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1190)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:136)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
为了使用rdd进行并行化,我尝试了以下方法:
session
.sparkContext
.parallelize(cursors)
.map(cursor => df.filter($"${filter._1}".isin(cursor:_*)))
.reduce(_ union _)
它抛出一个 NullPointerException
我理解第二次进近的问题,因为 DataFrame
以及 RDD
是抽象的驱动程序概念,因此执行者不能对它们进行操作。
但在尝试了所有这些之后,我已经没有主意了,还有什么可以尝试的。如果有人能给我指出正确的方向,我将不胜感激。
谢谢您!!
更新:更新了代码片段以更密切地关注问题。
暂无答案!
目前还没有任何答案,快来回答吧!