从tomcat9运行的spark驱动程序,无法将文件加载到java并行流中的spark会话

rur96b6h  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(220)

我们需要将多个Parquet文件加载到spark会话中,并计算每个Parquet文件中的记录数。我们想平行地做。所以我们选择了Java1.8并行流概念,但它引发了parquet文件异常。如果我们使用顺序流的概念,那么没有问题。但我们要并行加载所有文件。

List<String> list = Arrays.asList("hdfs://NNcluster/finalsnappy/2019/4/8291/table_8291_69_2019_04_01", "hdfs://NNcluster/finalsnappy/2019/4/8291/table_8291_69_2019_04_02");
list.parallelStream().forEach(fileName-> {
    SparkSession spark = get_spark_session();// gets spark session
    Dataset<Row> tmpDS = spark.read().format("parquet").load(fileName);
    tmpDS.show();
    System.out.println(tmpDS.count());
});

下面的异常是,如果使用parallelstream()方法,

java.lang.ClassNotFoundException: org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat$$anonfun$11$$anonfun$12
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:429)
        at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
        at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:85)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:173)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:555)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:156)
        at org.apache.jsp.test_jsp.lambda$0(test_jsp.java:210)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

但是所有的jar都装好了。如果我们删除列表中的parallelstream()方法,那么就不会有如下问题正常工作,

List<String> list = Arrays.asList("hdfs://NNcluster/finalsnappy/2019/4/8291/table_8291_69_2019_04_01", "hdfs://NNcluster/finalsnappy/2019/4/8291/table_8291_69_2019_04_02");
list.forEach(fileName-> {
    SparkSession spark = get_spark_session();// gets spark session
    Dataset<Row> tmpDS = spark.read().format("parquet").load(fileName);
    tmpDS.show();
    System.out.println(tmpDS.count());
});

有人能帮我们解决这个问题吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题