spark ui在尝试创建动态Dataframe时卡住

4si2a6ki  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(257)

我正在使用 Spark (2.2.0)ElasticSeach Hadoop (7.6.0) spark作业的目的是处理Parquet文件中的记录,并通过unique将其附加到elasticsearch中已经存在的文档中。由于elasticsearch不支持更新,因此获取记录和更新记录的部分由作业处理。
我有大约 20 million records 在索引中。在任何时候,我都不需要所有的记录,因此我使用过滤器下推来获取所需数量的文档。
出于性能原因,您可以下推的最大记录项数为 65536 . 我长大了 100K 但没有进一步移动它,因为获取的平均记录数介于 2-3 million .
所以我们的目标是创建Dataframe 100K 每个请求的记录,并使用 union 我的零件代码如下

val df = sparkSession
      .sqlContext
      .read
      .format("es")
      .load(index)

val CURSOR_SIZE = 100000
val cursors = filter._2.grouped(CURSOR_SIZE)
    cursors.map(cursor => df.filter($"${filter._1}".isin(cursor:_*))).reduce(_ union _)

有了上面的代码,spark用户界面就陷入了没有任务启动的困境 collect() 函数完成,直到我得到一个oom错误。

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
    at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
    at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
    at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
    at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1883)
    at java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1792)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1190)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:136)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)

为了使用rdd进行并行化,我尝试了以下方法:

session
 .sparkContext
 .parallelize(cursors)
 .map(cursor => df.filter($"${filter._1}".isin(cursor:_*)))
 .reduce(_ union _)

它抛出一个 NullPointerException 我理解第二次进近的问题,因为 DataFrame 以及 RDD 是抽象的驱动程序概念,因此执行者不能对它们进行操作。
但在尝试了所有这些之后,我已经没有主意了,还有什么可以尝试的。如果有人能给我指出正确的方向,我将不胜感激。
谢谢您!!
更新:更新了代码片段以更密切地关注问题。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题