java.lang.outofmemoryspace:java heap 在pyspark中从数据库获取1.2亿行时

zazmityj  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(304)

我对pyspark/apachespark很陌生。我需要从服务器上的数据库中获取多个表,每个表包含大约1.2亿行或更多行,以便进行分析。我应该能对数据进行计算。我在一个服务器上运行pyspark作为主服务器和从服务器,有7.45g的ram。我已经安装了jdbc驱动程序,这是我使用的代码。

from pyspark.sql import SQLContext
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

hostname = "xx.xxx.xx.xx"
dbname = "AAA"
jdbcPort = 3306
username = "xxxxx"
password = "yyyyy"

jdbc_url = "jdbc:mysql://{}:{}/{}?user={}&password={}".format(hostname, jdbcPort, dbname, username, password)
query = "(SELECT * FROM SAMPLE_TABLE_NAME) alias_name"
df = sqlContext.read.format('jdbc').options(driver='com.mysql.jdbc.Driver', url=jdbc_url, dbtable=query).load()

查询加载良好,但当我加载时 df.show() ,它显示以下内容:

[Stage 0:>                                                          (0 + 1) / 1]20/06/11 11:54:29 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: Java heap space
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
20/06/11 11:54:29 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 0,5,main]
java.lang.OutOfMemoryError: Java heap space
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
20/06/11 11:54:29 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:304)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

20/06/11 11:54:29 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 380, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a,**kw)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaErrorERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
: <exception str() failed>

我还读到spark使用可用ram的一小部分进行计算,其中驱动程序内存和执行程序内存的默认值是1024mb和512mb。所以我用这个从终端启动了pyspark,并实现了上面显示的相同代码: pyspark --jars /home/ubuntu/mysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar --driver-memory 7G --executor-memory 7G 这样可以消除 java.lang.OutofMemorySpace: Java heap space 错误,但显示其他错误,如- py4j.protocol.Py4JNetworkError: Answer from Java side is empty 以及 IndexError: pop from an empty deque 有人能解释一下发生了什么,我做错了什么,我需要做些什么来解决这个问题吗?

wz3gfoph

wz3gfoph1#

您试图在ram中加载太多的数据。您应该首先减少sql在到达spark之前检索到的数据量,并使用spark参数(例如分区)对其进行优化。
考虑以下一个或多个优化:
在中指定 SELECT 明确地查看哪些列,如果可能,只查看您需要的列;
(原始查询)循环 while 循环直到你可以 fetch 每行循环一次。以下技术可以通过设置一个常量来工作 n_rows 在内存中读取和更新 i 每次循环的索引: LIMIT i,i+n_rows BETWEEN i AND i+n_rows WHILE primaryKey >= i AND primaryKey < i+n_rows 使用分区。使用 partitionColumn , lowerBound , upperBound 以及 numPartitions (参考文献1)和(参考文献2): partitionColumn 选择将用于确定如何拆分数据的列(例如,主键)。 lowerBound 建立的最小值 partitionColumn 那将被提取。 upperBound 设置的最大值 partitionColumn 那将被提取。 numPartitions 表示要设置多少并行连接以通过rdbms读取数据。
所以spark将使用您在执行以下操作时获得的行来检索您的数据集 SELECT * FROM table WHERE partitionColumn BETWEEN lowerBound AND upperBound .

相关问题