我正在尝试从s3 bucket加载一个大的(大于系统内存的)json来触发Dataframe。但低于错误。
org.apache.http.connectionclosedexception:内容长度分隔的消息正文过早结束(预期:138345560;收到:67924532
我正在使用spark2.4.7和hadoop2.7发行版,以及python3.7。
我复制了hadoop-aws:2.7.3.jar美国焊接学会-sdk:1.7.4.jar“在
sparkhome/jars/目录。已经在使用jdk1.8(set spark\u home、java\u home、hadoop\u home)环境变量了。
因为这个错误,以前的Python3.8、Java15、Spark3.0和Hadoop2.7都被降级了。
# pyspark_job.py
from pyspark.sql import SparkSession
def create_spark_session():
"""
Create spark session.
Returns:
spark (SparkSession) - spark session
"""
spark = SparkSession \
.builder \
.master("local[4]") \
.appName("Myappname") \
.getOrCreate()
return spark
def setConfigs(spark):
"""
setting the environment configuration properties for s3 access
arguments:
spark (SparkSession) - spark session
"""
ak = 'accesskey' # as of now I am using the keys directly, once i get the basic functionality working, the keys will be taken from property file/env variable
sk = 'secretkey'
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", ak)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", sk)
sc._jsc.hadoopConfiguration().set("fs.s3a.readahead.range", '512M')
def process_events_data(spark, input_path):
"""Process the event data from s3 bucket as json files and get specific info.
Arguments:
spark (SparkSession) - spark session connected to local cluster
input_path (str) - AWS S3 bucket path for source data
returns
Data Frame with event data
"""
df = spark.read.format('json').load(input_path)
#df = spark.read.json("input_path")
return df
def main():
bn = 'bucketname'
sf = 'prefix/'
fp = bn+sf
input_path = 's3a://bucketname/prefix/filename.json'
spark = create_spark_session()
setConfigs(spark)
edf = process_events_data(spark, input_path)
if __name__ == '__main__':
main()
错误跟踪:
20/09/29 22:32:35警告tasksetmanager:阶段0.0中的任务1.0丢失(tid 1,localhost,executor driver):taskkilled(阶段取消)回溯(最后一次调用):文件“pyspark\u job.py”,第61行,in main()文件“pyspark\u job.py”,第58行,in main process\u events\u data(spark,input\u path)文件“pyspark\u job.py”,第41行,进程内\u事件\u数据df=spark.read.format('json').load(输入\u路径)文件“spark \u dir\spark-2.4.7 \u h2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py”,第166行,加载文件“spark \u dir\spark-2.4.7 \u h2.7\python\lib\py4j-0.10.7-src.zip\py4j\java \u gateway.py”,第1257行,在调用文件“spark\u dir\spark-2.4.7\u h2.7\python\lib\pyspark.zip\pyspark\sql\utils.py”的第63行,在deco文件“spark\u dir\spark-2.4.7\u h2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py”的第328行,在get\u return\u value py4j.protocol.py4jjavaerror:调用o42.load时出错:org.apache.spark.sparkexception:作业因阶段失败而中止:阶段0.0中的任务0失败1次,最近的失败:阶段0.0中的任务0.0丢失(tid 0,localhost,executor driver):org.apache.http.connectionclosedexception:内容长度分隔的消息正文过早结束(预期:138345560;收到:71284276
已经尝试了以下解决方案,但仍然出现错误:在使用pyspark从s3读取时,内容长度分隔的消息体引发了异常
我不确定是否缺少某些配置?或者s3那边出了什么事?或者我需要逐行读取json输入吗?这是我的第一个星火计划,有人能帮忙吗。
暂无答案!
目前还没有任何答案,快来回答吧!