apachebeam-parquetio+sparkrunner(阅读版)

2izufjch  于 2021-05-31  发布在  Hadoop
关注(0)|答案(0)|浏览(201)

我目前正在本地cloudera hadoop集群中使用sparkrunner运行apache beam应用程序,我使用的是apache beam 2.16和apache spark 2.4。我有两个相同管道的版本,一个从avro数据读取,另一个从parquet读取(代码如下)

PCollection<GenericRecord> records = pipeline
  .apply("Reading",ParquetIO.read(SCHEMA)
  .from("/foo/bar"));

records.apply("Writing",AvroIO.writeGenericRecords(SCHEMA)
  .to(options.getOutputPath())
  .withSuffix(".avro"));

我们看到了一个意外的行为之间的执行读取avro和Parquet。阅读avro,dag中有以下几个阶段(截图1)

我们正在读取2gb大小的数据,15个文件(用snappy压缩),但是运行相同的管道(但是读取Parquet),我们看到阶段不同,并且在单个任务中运行了一个额外的重新分区步骤(屏幕截图2)

对于parquet数据,管道的执行失败,因为“工作过载”(单个任务),我尝试添加一些额外的spark标志,但仍然失败。使用该额外标志执行的示例如下:

spark2-submit --master yarn --class my.class.MainClass \
--driver-memory 8G \
--executor-cores 5 \
--driver-cores 5 \
--conf spark.driver.memory=10G \
--conf spark.executor.memory=10G \
--conf spark.executor.memoryOverhead=2000 \
--conf spark.driver.memoryOverhead=2000 \
--conf spark.yarn.am.memoryOverhead=2000 \
--conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
--conf spark.network.timeout=1200 \
--conf spark.speculation=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=20 \
--conf spark.dynamicAllocation.minExecutors=10 \
--conf spark.shuffle.spill=true \
--conf spark.shuffle.spill.compress=true \
--conf spark.io.compression.codec=snappy \
--conf spark.executor.heartbeatInterval=10000000 \
--conf spark.network.timeout=10000000 \
--conf spark.default.parallelism=100 \
/parent/path/program.jar \
--inputPath="/this/is/the/input/path" \
--outputPath="/this/is/the/output/path" \
--runner=SparkRunner

如果有人能帮我解决读取Parquet地板数据的“工作过载”问题,那将非常有帮助:)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题