如何并行加载序列文件(来自HDF)并通过spark并行处理每个序列文件?

q0qdq0h2  于 2021-06-03  发布在  Hadoop
关注(0)|答案(0)|浏览(219)

我需要并行地加载hdfs文件并并行地处理(读取它并根据某些条件对其进行过滤)每个文件。下面的代码以串行方式加载文件。运行spark应用程序,有三个worker(每个4核)。我甚至尝试在并行化方法中设置paration参数,但性能没有提高。我确信我的集群有足够的资源并行运行这些作业。我应该做些什么改变使之平行?

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    sparkConf.set("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer");
    JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
    JavaRDD<String> files = sparkContext.parallelize(fileList);
    Iterator<String> localIterator = files.toLocalIterator();

    while (localIterator.hasNext())
    {
        String hdfsPath = localIterator.next();
        long startTime = DateUtil.getCurrentTimeMillis();
        JavaPairRDD<IntWritable, BytesWritable> hdfsContent = sparkContext.sequenceFile(hdfsPath, IntWritable.class, BytesWritable.class);

        try
        {
            JavaRDD<Message> logs = hdfsContent.map(new Function<Tuple2<IntWritable, BytesWritable>, Message>()
            {
                public Message call(Tuple2<IntWritable, BytesWritable> tuple2) throws Exception
                {
                    BytesWritable value = tuple2._2();
                    BytesWritable tmp = new BytesWritable();
                    tmp.setCapacity(value.getLength());
                    tmp.set(value);
                    return (Message) getProtos(logtype, tmp.getBytes());
                }
            });

            final JavaRDD<Message> filteredLogs = logs.filter(new Function<Message, Boolean>()
            {
                public Boolean call(Message msg) throws Exception
                {
                    FieldDescriptor fd = msg.getDescriptorForType().findFieldByName("method");
                    String value = (String) msg.getField(fd);
                    if (value.equals("POST"))
                    {
                        return true;
                    }
                    return false;
                }
            });

            long timetaken = DateUtil.getCurrentTimeMillis() - startTime;
            LOGGER.log(Level.INFO, "HDFS: {0} Total Log Count : {1} Filtered Log Count : {2} TimeTaken : {3}", new Object[] { hdfsPath, logs.count(), filteredLogs.count(), timetaken });

        }
        catch (Exception e)
        {
            LOGGER.log(Level.INFO, "Exception : ", e);
        }
    }

我没有迭代rdd文件,而是尝试了map&foreach等spark函数。但它抛出了以下spark异常。闭包中没有引用外部变量,我的类(oldloganalyzer)已经实现了可序列化接口。另外,kryoserializer和javaserializer也在sparkconf中配置。我对我的代码中什么是不可序列化的感到困惑。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
    at org.apache.spark.rdd.RDD.map(RDD.scala:286)
    at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:81)
    at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32)
    at com.test.logs.spark.OldLogAnalyzer.main(OldLogAnalyzer.java:423)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
    Serialization stack:
    - object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@68f277a2)
    - field (class: com.test.logs.spark.OldLogAnalyzer$10, name: val$sparkContext, type: class org.apache.spark.api.java.JavaSparkContext)
    - object (class com.test.logs.spark.OldLogAnalyzer$10, com.test.logs.spark.OldLogAnalyzer$10@2f80b005)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 15 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题