“内容长度分隔的消息体过早结束”-使用pyspark从s3 bucket访问json文件时出错

whhtz7ly  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(304)

我正在尝试从s3 bucket加载一个大的(大于系统内存的)json来触发Dataframe。但低于错误。
org.apache.http.connectionclosedexception:内容长度分隔的消息正文过早结束(预期:138345560;收到:67924532
我正在使用spark2.4.7和hadoop2.7发行版,以及python3.7。
我复制了hadoop-aws:2.7.3.jar美国焊接学会-sdk:1.7.4.jar“在
sparkhome/jars/目录。已经在使用jdk1.8(set spark\u home、java\u home、hadoop\u home)环境变量了。
因为这个错误,以前的Python3.8、Java15、Spark3.0和Hadoop2.7都被降级了。


# pyspark_job.py

from pyspark.sql import SparkSession

def create_spark_session():
    """
    Create spark session.
    Returns:
        spark (SparkSession) - spark session
    """
    spark = SparkSession \
        .builder \
        .master("local[4]") \
        .appName("Myappname") \
        .getOrCreate()
    return spark

def setConfigs(spark):
    """
    setting the environment configuration properties for s3 access
    arguments:
        spark (SparkSession) - spark session
    """
    ak = 'accesskey'    # as of now I am using the keys directly, once i get the basic functionality working, the keys will be taken from property file/env variable
    sk = 'secretkey'
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",  ak)
    sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", sk)
    sc._jsc.hadoopConfiguration().set("fs.s3a.readahead.range", '512M')

def process_events_data(spark, input_path):
    """Process the event data from s3 bucket as json files and get specific info.
    Arguments:
        spark (SparkSession) - spark session connected to local cluster
        input_path (str) - AWS S3 bucket path for source data
    returns
        Data Frame with event data
    """
    df = spark.read.format('json').load(input_path)
    #df = spark.read.json("input_path")
    return df

def main():
    bn = 'bucketname'
    sf = 'prefix/'
    fp = bn+sf
    input_path = 's3a://bucketname/prefix/filename.json'

    spark = create_spark_session()
    setConfigs(spark)

    edf = process_events_data(spark, input_path)

if __name__ == '__main__':
    main()

错误跟踪:
20/09/29 22:32:35警告tasksetmanager:阶段0.0中的任务1.0丢失(tid 1,localhost,executor driver):taskkilled(阶段取消)回溯(最后一次调用):文件“pyspark\u job.py”,第61行,in main()文件“pyspark\u job.py”,第58行,in main process\u events\u data(spark,input\u path)文件“pyspark\u job.py”,第41行,进程内\u事件\u数据df=spark.read.format('json').load(输入\u路径)文件“spark \u dir\spark-2.4.7 \u h2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py”,第166行,加载文件“spark \u dir\spark-2.4.7 \u h2.7\python\lib\py4j-0.10.7-src.zip\py4j\java \u gateway.py”,第1257行,在调用文件“spark\u dir\spark-2.4.7\u h2.7\python\lib\pyspark.zip\pyspark\sql\utils.py”的第63行,在deco文件“spark\u dir\spark-2.4.7\u h2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py”的第328行,在get\u return\u value py4j.protocol.py4jjavaerror:调用o42.load时出错:org.apache.spark.sparkexception:作业因阶段失败而中止:阶段0.0中的任务0失败1次,最近的失败:阶段0.0中的任务0.0丢失(tid 0,localhost,executor driver):org.apache.http.connectionclosedexception:内容长度分隔的消息正文过早结束(预期:138345560;收到:71284276
已经尝试了以下解决方案,但仍然出现错误:在使用pyspark从s3读取时,内容长度分隔的消息体引发了异常
我不确定是否缺少某些配置?或者s3那边出了什么事?或者我需要逐行读取json输入吗?这是我的第一个星火计划,有人能帮忙吗。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题