在join和reducebykey中,spark执行器内存不足

jmo0nnb3  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(466)

在spark2.0中,我有两个Dataframe,我需要首先连接它们,并执行reducebykey来聚合数据。我总是在遗嘱执行人那里有房间。提前谢谢。

数据

d1(1g,5亿行,缓存,按列id2分区)

id1 id2
1   1
1   3
1   4
2   0
2   7
...

d2(160g,200万行,缓存,按列id2分区,值列包含5000个浮点数的列表)

id2   value
0     [0.1, 0.2, 0.0001, ...]
1     [0.001, 0.7, 0.0002, ...]
...

现在我需要连接两个表以得到d3,并使用spark.sql

select d1.id1, d2.value
FROM d1 JOIN d2 ON d1.id2 = d2.id2

然后我在d3上做一个reducebykey,并为表d1中的每个id1聚合值

d4 = d3.rdd.reduceByKey(lambda x, y: numpy.add(x, y)) \
           .mapValues(lambda x: (x / numpy.linalg.norm(x, 1)).toList)\
           .toDF()

我估计d4的尺寸是340克。现在我在r3.8x大型机器上运行作业

mem: 244G
cpu: 64
Disk: 640G

问题

我玩了一些配置,但我总是在执行oom。所以,问题是
是否可以在当前类型的机器上运行此作业?或者我应该用更大的机器(多大?)。但我记得我看到过一些文章/博客,上面说用相对较小的机器处理太字节的数据。
我应该做什么样的改进?e、 Spark配置,代码优化?
是否可以估计每个执行器所需的内存量?

Spark配置

我试过一些Spark配置
配置1:

--verbose
--conf spark.sql.shuffle.partitions=200
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -     XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3

配置2:

--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3

配置3:

--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=true
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--executor-memory 6G
--executor-cores 2
--driver-memory 6G
--driver-cores 3

配置4:

--verbose
--conf spark.sql.shuffle.partitions=20000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 13
--executor-memory 15G
--executor-cores 5
--driver-memory 13G
--driver-cores 5

错误

来自执行者的oom错误1

ExecutorLostFailure (executor 14 exited caused by one of the running  tasks) Reason: Container killed by YARN for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Heap
PSYoungGen      total 1830400K, used 1401721K [0x0000000740000000,   0x00000007be900000, 0x00000007c0000000)
eden space 1588736K, 84% used [0x0000000740000000,0x0000000791e86980,0x00000007a0f80000)
from space 241664K, 24% used [0x00000007af600000,0x00000007b3057de8,0x00000007be200000)
to  space 236032K, 0% used [0x00000007a0f80000,0x00000007a0f80000,0x00000007af600000)
ParOldGen      total 4194304K, used 4075884K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 97% used [0x0000000640000000,0x0000000738c5b198,0x0000000740000000)
Metaspace      used 59721K, capacity 60782K, committed 61056K,  reserved 1101824K
class space    used 7421K, capacity 7742K, committed 7808K, reserved 1048576K

来自执行者的oom错误2

ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container marked as failed: container_1477662810360_0002_01_000008 on host: ip-172-18-9-130.ec2.internal. Exit status: 52. Diagnostics: Exception from container-launch.

Heap
PSYoungGen      total 1968128K, used 1900544K [0x0000000740000000, 0x00000007c0000000, 0x00000007c0000000)
eden space 1900544K, 100% used [0x0000000740000000,0x00000007b4000000,0x00000007b4000000)
from space 67584K, 0% used [0x00000007b4000000,0x00000007b4000000,0x00000007b8200000)
to  space 103936K, 0% used [0x00000007b9a80000,0x00000007b9a80000,0x00000007c0000000)
ParOldGen      total 4194304K, used 4194183K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 99% used [0x0000000640000000,0x000000073ffe1f38,0x0000000740000000)
Metaspace      used 59001K, capacity 59492K, committed 61056K, reserved 1101824K
class space    used 7300K, capacity 7491K, committed 7808K, reserved 1048576K

来自容器的错误

16/10/28 14:33:21 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$3$$anon$2.hasNext(WholeStageCodegenExec.scala:386)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:36 ERROR Utils: Uncaught exception in thread driver-heartbeater
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Double.valueOf(Double.java:519)
    at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.get(UnsafeArrayData.java:138)
    at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135)
    at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:64)
    at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:57)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2517)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2517)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:43 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[stdout writer for python,5,main]

更新1

如果按id2分区,数据d1看起来是非常倾斜的。因此,联接将导致oom。如果d1像我之前想的那样均匀分布,那么上面的配置应该可以工作。

更新2

我张贴了我的尝试,以解决问题的情况下,有人也遇到类似的问题。

尝试1

我的问题是,如果我用id2划分d1,那么数据是非常倾斜的。因此,存在一些几乎包含所有id1的分区。因此,与d2的连接将导致oom错误。为了缓解这个问题,我首先确定一个子集 s 如果按id2分区,则可能会导致这样的数据倾斜。然后我从d2创建一个d5,只包括 s d2和d6不包括 s . 幸运的是,d5的尺寸不是太大。所以,我可以广播d1和d5。然后我加入d1和d6。然后,我把这两个结果结合起来,做一个reducebykey。我马上就要解决这个问题了。我没有继续这样做,因为我的d1可能会在以后变得更大。换句话说,这种方法对我来说并不是真正可伸缩的

尝试2

幸运的是,在我的例子中,d2中的大多数值都非常小。基于我的应用程序,我可以安全地删除小值并将向量转换为sparsevector,从而显著减小d2的大小。完成此操作后,我用id1划分d1并广播join d2(在删除小值之后)。当然,必须提高驾驶员的记忆,以允许相对较大的广播变量。这对我很有用,而且对我的应用程序也是可伸缩的。

lymgl2op

lymgl2op1#

这里有一些东西可以尝试:减少一点你的遗嘱执行人的大小。你现在有:

--executor-memory 48G
--executor-cores 15

试一试:

--executor-memory 16G
--executor-cores 5

由于各种原因,较小的执行器大小似乎是最佳的。其中之一是java堆大小大于32g会导致对象引用从4字节变为8字节,所有内存需求都会膨胀。
编辑:问题实际上可能是d4分区太大(尽管其他建议仍然适用!)。您可以通过将d3重新划分为更多的分区(大致为d1*4)或将其传递给 numPartitions 的可选参数 reduceByKey . 这两个选项都将触发洗牌,但这比崩溃要好。

idv4meu8

idv4meu82#

我也遇到了同样的问题,但我找到了很多答案,但都不能解决我的问题。最后,我一步一步地调试代码。我发现每个分区的数据大小不平衡所导致的问题。就这么做吧 df_rdd.repartition(nums)

相关问题