我一直在写这个代码有一段时间了。下面我列出了代码和我在emr上使用的大多数集群属性。代码的目的是根据一些基本的迭代将一些csv文件按一定的行号拆分为两个(我在下面的代码中包含了一个简单的拆分)。
我经常犯这个错误” Container killed by YARN for exceeding memory limits
“并遵循这些设计原则(链接如下)来解决它,但我只是不知道为什么这会导致内存问题。我有超过22gb的Yarn开销,文件是在mb到一位数gb的范围。
我有时使用r5a.12xlarges没有用。我真的没有看到这段代码有任何内存泄漏。它看起来也很慢,只能在16小时内处理20gb左右的输出到s3。这是一个很好的方法来并行这个分割操作吗?有内存泄漏吗?有什么好处?
https://aws.amazon.com/premiumsupport/knowledge-center/emr-spark-yarn-memory-limit/
[
{
"Classification": "spark",
"Properties": {
"spark.maximizeResourceAllocation": "true"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.yarn.executor.memoryOverheadFactor":".2"
}
},
{
"Classification": "spark-env",
"Configurations": [
{
"Configurations": [],
"Properties": {
"PYSPARK_PYTHON": "python36"
},
"Classification": "export"
}
],
"Properties": {
}
}
]
def writetxt(txt: Union[List[str], pandas.DataFrame], path: str) -> None:
s3 = boto3.resource('s3')
s3path = S3Url(path)
object = s3.Object(s3path.bucket, s3path.key)
if isinstance(txt, pandas.DataFrame):
csv_buffer = StringIO()
txt.to_csv(csv_buffer)
object.put(Body=csv_buffer.getvalue())
else:
object.put(Body='\n'.join(txt).encode())
def main(
x: Iterator[Tuple[str, str]],
output_files: str
) -> None:
filename, content = x
filename = os.path.basename(S3Url(filename).key)
content = content.splitlines()
# Split the csv file
columnAttributes, csvData = data[:100], data[100:]
writetxt(csvData, os.path.join(output_files, 'data.csv', filename))
writetxt(columnAttributes, os.path.join(output_files, 'attr.csv', filename))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Split some mishapen csv files.')
parser.add_argument('input_files', type=str,
help='The location of the input files.')
parser.add_argument('output_files', type=str,
help='The location to put the output files.')
parser.add_argument('--nb_partitions', type=int, default=4)
args = parser.parse_args()
# creating the context
sc = SparkContext(appName="Broadcom Preprocessing")
# We use minPartitions because otherwise small files get put in the same partition together
# by default, which we have a lot of
# We use foreachPartition to reduce the number of function calls, which slow down spark
distFiles = sc.wholeTextFiles(args.input_files, minPartitions=args.nb_partitions) \
.foreach(partial(main, output_files=args.output_files))
1条答案
按热度按时间fafcakar1#
我认为您的内存问题是因为您正在使用python代码进行实际的数据拆分。spark进程在jvm中运行,但是当您调用自定义python代码时,必须将相关数据序列化到python进程(在每个工作节点上)才能执行。这会增加很多开销。我相信您完全可以通过spark操作来完成您想要完成的任务,这意味着最终的程序将完全在基于jvm的spark进程中运行。
尝试以下操作:
最后,您需要使用一些定制的python代码将每个分割文件保存到s3(或者,您可以用scala/java编写所有代码)。通过udfs来实现这一点比将标准python函数传递给
.foreach(...)
. 在内部,spark将把数据序列化为arrow格式的块(每个分区一个),这将非常有效。此外,看起来您正试图在一个请求中将整个对象放入s3。如果数据太大,它将失败。你应该看看s3流上传功能。