热释光;我博士通过增加分区的数量来修复我的代码,使之等于文件的数量,但是我不明白为什么这样做,或者这是不是最好的方法。任何意见都将不胜感激。
我一直在尝试通过 SparkConf
但这通常会导致内存开销错误(超过5gb的默认分配)。解决办法是 spark.default.parallelism
到690(s3中690个单独文件的100 GB)。
难道每个执行者一次不能处理多个文件吗?
函数 process_files
由以下部分组成:
从s3下载文本文件
计算一个单词的示例
产量计数和其他元数据
def run():
'''docstring for run'''
conf = SparkConf() \
.set("spark.default.parallelism", 690)
sc = SparkContext(
appName='spark-cc-analysis',
conf=conf)
sqlc = SQLContext(sparkContext=sc)
filename = config.input_file
pathlist = pathlist_from_csv(filename)
rdd = sc.parallelize(pathlist)
results = rdd.mapPartitions(process_files).collect()
columns = ['file_name','timestamp','entity','entity_count']
df = sqlc.createDataFrame(results,columns)
df.show()
output = config.output
df.write.mode('overwrite').parquet(output)
并行度设置为100时发生的内存开销错误
WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 13 for
reason Container killed by YARN for exceeding memory limits. 5.1 GB of 5.0 GB physical memory
used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-
check-enabled because of YARN-4714.
1条答案
按热度按时间e37o9pze1#
这完全取决于文件的大小和Map的操作。这里没有人能告诉你为什么你的执行者使用了和它一样多的内存。
您将不得不通过日志分析来调试执行器的资源消耗。或者如果成本不是问题,你可以增加内存而不用担心。