使用pyspark rdd分割misshappen csv文件电子病历内存异常错误

6g8kf2rb  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(428)

我一直在写这个代码有一段时间了。下面我列出了代码和我在emr上使用的大多数集群属性。代码的目的是根据一些基本的迭代将一些csv文件按一定的行号拆分为两个(我在下面的代码中包含了一个简单的拆分)。
我经常犯这个错误” Container killed by YARN for exceeding memory limits “并遵循这些设计原则(链接如下)来解决它,但我只是不知道为什么这会导致内存问题。我有超过22gb的Yarn开销,文件是在mb到一位数gb的范围。
我有时使用r5a.12xlarges没有用。我真的没有看到这段代码有任何内存泄漏。它看起来也很慢,只能在16小时内处理20gb左右的输出到s3。这是一个很好的方法来并行这个分割操作吗?有内存泄漏吗?有什么好处?
https://aws.amazon.com/premiumsupport/knowledge-center/emr-spark-yarn-memory-limit/

[
    {
        "Classification": "spark",
        "Properties": {
            "spark.maximizeResourceAllocation": "true"
        }
    },
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.yarn.executor.memoryOverheadFactor":".2"
        }
    },
    {
        "Classification": "spark-env",
        "Configurations": [
            {
                "Configurations": [],
                "Properties": {
                    "PYSPARK_PYTHON": "python36"
                },
                "Classification": "export"
            }
        ],
        "Properties": {
        }
    }
]
def writetxt(txt: Union[List[str], pandas.DataFrame], path: str) -> None:
        s3 = boto3.resource('s3')
        s3path = S3Url(path)
        object = s3.Object(s3path.bucket, s3path.key)
        if isinstance(txt, pandas.DataFrame):
            csv_buffer = StringIO()
            txt.to_csv(csv_buffer)
            object.put(Body=csv_buffer.getvalue())
        else:
            object.put(Body='\n'.join(txt).encode())

    def main(
            x: Iterator[Tuple[str, str]],
            output_files: str
    ) -> None:
        filename, content = x
        filename = os.path.basename(S3Url(filename).key)
        content = content.splitlines()

        # Split the csv file
        columnAttributes, csvData = data[:100], data[100:]

        writetxt(csvData, os.path.join(output_files, 'data.csv', filename))
        writetxt(columnAttributes, os.path.join(output_files, 'attr.csv', filename))

    if __name__ == "__main__":
        parser = argparse.ArgumentParser(description='Split some mishapen csv files.')
        parser.add_argument('input_files', type=str,
                            help='The location of the input files.')
        parser.add_argument('output_files', type=str,
                            help='The location to put the output files.')
        parser.add_argument('--nb_partitions', type=int, default=4)
        args = parser.parse_args()

        # creating the context
        sc = SparkContext(appName="Broadcom Preprocessing")

        # We use minPartitions because otherwise small files get put in the same partition together
        # by default, which we have a lot of
        # We use foreachPartition to reduce the number of function calls, which slow down spark
        distFiles = sc.wholeTextFiles(args.input_files, minPartitions=args.nb_partitions) \
            .foreach(partial(main, output_files=args.output_files))
fafcakar

fafcakar1#

我认为您的内存问题是因为您正在使用python代码进行实际的数据拆分。spark进程在jvm中运行,但是当您调用自定义python代码时,必须将相关数据序列化到python进程(在每个工作节点上)才能执行。这会增加很多开销。我相信您完全可以通过spark操作来完成您想要完成的任务,这意味着最终的程序将完全在基于jvm的spark进程中运行。
尝试以下操作:

from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import *

input_path = "..."

split_num = 100

# load filenames & contents

filesDF = spark.createDataFrame( sc.wholeTextFiles(input_path), ['filename','contents'] )

# break into individual lines & number them

linesDF = filesDF.select( "filename", \
                          row_number().over(Window.partitionBy("filename").orderBy("filename")).alias("line_number"), \
                          explode(split(col("contents"), "\n")).alias("contents") )

# split into headers & body

headersDF = linesDF.where(col("line_number") == lit(1))
bodyDF = linesDF.where(col("line_number") > lit(1))

# split the body in 2 based

splitLinesDF = bodyDF.withColumn("split", when(col("line_number") < lit(split_num), 0).otherwise(1))
split_0_DF = splitLinesDF.where(col("split") == lit(0)).select("filename", "line_number", "contents").union(headersDF).orderBy("filename", "line_number")
split_1_DF = splitLinesDF.where(col("split") == lit(1)).select("filename", "line_number", "contents").union(headersDF).orderBy("filename", "line_number")

# collapse all lines back down into a file

firstDF = split_0_DF.groupBy("filename").agg(concat_ws("\n",collect_list(col("contents"))).alias("contents"))
secondDF = split_1_DF.groupBy("filename").agg(concat_ws("\n",collect_list(col("contents"))).alias("contents"))

# pandas-UDF for more memory-efficient transfer of data from Spark to Python

@pandas_udf(returnType=IntegerType())
def writeFile( filename, contents ):
  <save to S3 here>

# write each row to a file

firstDF.select( writeFile( col("filename"), col("contents") ) )
secondDF.select( writeFile( col("filename"), col("contents") ) )

最后,您需要使用一些定制的python代码将每个分割文件保存到s3(或者,您可以用scala/java编写所有代码)。通过udfs来实现这一点比将标准python函数传递给 .foreach(...) . 在内部,spark将把数据序列化为arrow格式的块(每个分区一个),这将非常有效。
此外,看起来您正试图在一个请求中将整个对象放入s3。如果数据太大,它将失败。你应该看看s3流上传功能。

相关问题