我读了很多关于spark的记忆使用的文章,比如 collect()
或者 toPandas()
(就像这里)。通常的做法是只在小数据集上使用它。关键是Spark有多小?
我使用pyspark在本地运行(用于测试),驱动程序内存设置为20g(我的16核mac上有32g),但是 toPandas()
即使数据集只有2万行也会崩溃!那不可能是对的,所以我怀疑我做错了什么。这是再现错误的简化代码:
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# setting the number of rows for the CSV file
N = 20000
ncols = 7
c_name = 'ABCDEFGHIJKLMNOPQRSTUVXYWZ'
# creating a pandas dataframe (df)
df = pd.DataFrame(np.random.randint(999,999999,size=(N, ncols)), columns=list(c_name[:ncols]))
file_name = 'random.csv'
# export the dataframe to csv using comma delimiting
df.to_csv(file_name, index=False)
## Load the csv in spark
df = spark.read.format('csv').option('header', 'true').load(file_name)#.limit(5000)#.coalesce(2)
## some checks
n_parts = df.rdd.getNumPartitions()
print('Number of partitions:', n_parts)
print('Number of rows:', df.count())
## conver spark df -> toPandas
df_p = df.toPandas()
print('With pandas:',len(df_p))
我在jupyter中运行这个,得到如下错误:
ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to /192.168.0.104:61536
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
...
我的spark本地设置是(其他所有默认设置):
('spark.driver.host', '192.168.0.104')
('spark.driver.memory', '20g')
('spark.rdd.compress', 'True')
('spark.serializer.objectStreamReset', '100')
('spark.master', 'local[*]')
('spark.executor.id', 'driver')
('spark.submit.deployMode', 'client')
('spark.app.id', 'local-1618499935279')
('spark.driver.port', '55115')
('spark.ui.showConsoleProgress', 'true')
('spark.app.name', 'pyspark-shell')
('spark.driver.maxResultSize', '4g')
是我的设置错误,还是即使是20g的驱动程序内存也无法处理20k行和7列的小Dataframe?重新分区会有帮助吗?
暂无答案!
目前还没有任何答案,快来回答吧!