我正在阅读一个广泛嵌套的avro数据集,其中包含从pyspark开始的基于Yarn的spark应用程序。我的目标是重新分区并作为Parquet文件写入,但我经常会出现gc内存不足错误。
我在执行任务 spark-submit
通过一个隐藏细节的 Package 库,但我可以共享我的spark配置、pyspark代码和executors摘要。此配置将传递给 spark-submit
当我执行驱动程序时:
我的星火形态,根据 pm.spark.sparkContext.getConf().getAll()
(我不知道怎么把它折叠起来)
spark.eventlog.enabled=真
spark.network.crypto.enabled=真
spark.sql.queryexecutionlisteners=com.cloudera.spark.lineage.navigatorquerylistener
spark.authenticate.sasl.encryption.aes.enabled=true
spark.authenticate.enablesalEncryption=真
spark.ui.proxybase=/proxy/application\u 1598440892293\u 34498
spark.serializer=org.apache.spark.serializer.kryoserializer
spark.driver.host=.x.com
spark.sql.hive.metastore.jars=${env:hadoop_common_home}/../hive/lib/:${env:hadoop_common_home}/客户/
spark.dynamicallocation.maxexecutors=30
spark.executorenv.pythonpath=/opt/cloudera/parcels/spark2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/py4j-0.10.7-src.zip:/opt/cloudera/parcels/spark2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/:/opt/cloudera/spark2/lib/spark2/python/opt/cloudera/parcels/spark2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/spark/spark2/python/lib/py4j-0.10.7-src.zip/opt/cloudera/parcels/spark2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/lib/pyspark.zip
spark.ui.filters=org.apache.hadoop.yarn.server.webproxy.amfilter.amipfilter
spark.driver.memory=30克
spark.driver.extralibrarypath=/opt/cloudera/parcels/cdh-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop/lib/native
spark.driver.memoryoverhead=9g
spark.ui.enabled=真
spark.driver.appui地址=http://x.x.com:4045
spark.executor.id=驱动程序
spark.dynamicallocation.schedulerbacklogtimeout=1
spark.app.id=应用程序\u 1598440892293 \u 34498
spark.yarn.jars=local:/opt/cloudera/parcels/spark2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/jars/*
spark.app.name=pysparkshell
spark.sql.hive.metastore.version=1.1.0
spark.yarn.config.gatewaypath=/opt/cloudera/parcels
spark.extralisteners=com.cloudera.spark.lineage.navigator应用程序
spark.shuffle.encryption.enabled=真
spark.sql.warehouse.dir=/user/hive/warehouse
spark.sql.catalogimplementation=配置单元
spark.driver.extraclasspath=/apps/oracle/product/12.1.0.2/jdbc/lib/ojdbc7.jar:/path/to/spark-avro琰2.11-2.4.0.jar
spark.yarn.config.replacementpath={{hadoop\u common\u home}}/./../../。。
spark.yarn.historyserver.address文件=http://x.x.com:18089
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.amipfilter.param.rm\u ha\u url=x.x。com:8090,x.x。com:8090
spark.authenticate=真
spark.port.maxretries=1000
spark.lineage.log.dir=/hadooplog/spark2/lineage
spark.eventlog.dir目录=hdfs://nameservice1/user/spark/spark2applicationhistory
spark.sql.files.maxpartitionbytes=67108864
spark.ui.killenabled=真
spark.warn.secondary.jars=ojdbc7.jar
spark.executor.cores=4
spark.dynamicallocation.executoridletimeout=60
spark.yarn.am.extralibrarypath=/opt/cloudera/parcels/cdh-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop/lib/native
spark.io.encryption.enabled=真
spark.dynamicallocation.initialexecutors=3
spark.serializer.objectstreamreset=100
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.amipfilter.param.proxy\u uri\u基=https://lxe1731.allstate.com:8090/代理/应用程序\u 1598440892293 \u 34498,https://x.x.com:8090/代理/应用程序\u 1598440892293 \u 34498
spark.submit.deploymode=客户端
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.amipfilter.param.proxy\u hosts=x.x.com,x.x.com
spark.io.compression.codec=快速
spark.shuffle.service.enabled=真
spark.executor.memory=10g
spark.repl.local.jars=文件:///apps/oracle/product/12.1.0.2/jdbc/lib/ojdbc7.jar
spark.driver.port=41668
spark.executor.extralibrarypath=/opt/cloudera/parcels/cdh-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop/lib/native
spark.shuffle.service.port=7337
spark.unsafe.sorter.spill.read.ahead.enabled=假
spark.lineage.enabled=真
spark.master=Yarn
spark.debug.maxtostringfields=10000
spark.rdd.compress=真
spark.dynamicallocation.minexecutors=0
spark.warn.ispython=真
spark.dynamicallocation.enabled=真
executor.extraclasspath=/apps/oracle/product/12.1.0.2/jdbc/lib/ojdbc7.jar:/home/rmchh/jars/spark-avro_2.11-2.4.0.jar
spark.ui.showconsoleprogress=真
spark.yarn.dist.jars=文件:///apps/oracle/product/12.1.0.2/jdbc/lib/ojdbc7.jar
Pypark驱动程序代码要点:
from SparkWrapper import PySparkManager
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField
# Create StructType schema from json file:
with open('schema/pmdm_final_schema.json') as f:
json_schema = json.load(f)
schema = StructType.fromJson(json_schema)
pm = PySparkManager()
spark = pm.spark
path_to_partition = "/data/res/warehouse/mydata/partition_3/APL_00000.avro"
df = spark.read.format("avro").schema(schema).load(path_to_partition).limit(10)
df.cache()
# Repartition and write as parquet
df.repartition(15).write.mode("overwrite").parquet("/data/res/warehouse/mydata/as_parquet")
不幸的是,这种重新分区和写入从未成功。看起来只创建了一个任务。我不知道如何确保创建多个任务来利用我的执行者和内存。
暂无答案!
目前还没有任何答案,快来回答吧!