在我的glue-pyspark工作中,我试图从s3读取一个大json文件(大约87gb)。我必须删除这个文件的重复,打破这个文件成多个较小的文件,然后保存回s3。当我试图这样做,通过运行下面的工作。我正在处理资源问题。有什么方法可以优化它吗?
谢谢你事先的帮助。
from pyspark.sql import SparkSession
if __name__ == '__main__':
app_name = "test"
spark = SparkSession.builder.appName(app_name).getOrCreate()
DATA_FILE_PATH = 's3://test//ids_20200606_173121.gz'
output_FILE_PATH = 's3://output/data/y=2020/m=06/d=10'
device_graph_df = spark.read.json(DATA_FILE_PATH)
distinct_device_graph_df = device_graph_df.dropDuplicates(['ip'])
device_graph_df = distinct_device_graph_df.repartition(40)
distinct_device_graph_df.write.parquet(output_FILE_PATH )
错误
命令失败,退出代码为1-Yarn资源管理器已终止spark应用程序,请参阅spark驱动程序日志/度量以获取诊断信息[任务5]client.yarnclient(yarnclient)的执行者任务启动工作进程。java:makerestapirequest(66))-获取url的请求失败http://0.0.0.0:8088/ws/v1/cluster/apps/application\u 1591879099247\u 0001 com.amazon.ws.emr.hadoop.fs.shade.org.apache.http.conn.httphostconnectexception:连接到0.0.0:8088[/0.0.0.0]失败:连接被拒绝(连接被拒绝)位于com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.defaulthttpclientconnectionoperator.connect(defaulthttpclientconnectionoperator)。java:158)
2条答案
按热度按时间mpgws1up1#
你可以尝试两种选择
使用合并而不是重新分区。
先在ip上重新分区,然后执行重复数据消除。
6jjcrrmo2#
您有几个问题:
非常大的json文件是用gzip压缩的,这使得文件不可拆分,所有的文件只需要一个执行器来处理(不管您的作业是否配置了更多的worker)。要解决这个问题,您可以解压缩文件。如果你需要压缩文件来处理它,那么你可以尝试bzip2或lzo,它们在hadoop中是标准的,但是我没有在glue中使用它们。为了能够读取数据,这几乎是必须的(仅在一个节点中处理87gb的压缩数据将需要glue无法提供的大量内存)。
spark需要读取json文件两次,一次用于推断模式,另一次用于处理数据,87 gb的数据可能是一个挑战。要将此最小化,您有两个选择:
a) 如果您知道json记录的模式,那么您可以提供以下模式:
b) 仅读取部分数据以推断模式(例如,十分之一):