spark流:微批并行执行

rn0zuynd  于 2021-05-29  发布在  Hadoop
关注(0)|答案(4)|浏览(407)

我们正在接收来自Kafka的Spark流数据。一旦在spark streaming中开始执行,它只执行一个批,其余批开始在kafka中排队。
我们的数据是独立的,可以并行处理。
我们尝试了多种配置,包括多个执行器、内核、背压和其他配置,但到目前为止没有任何效果。队列中有许多消息,一次只处理了一个微批处理,其余的仍保留在队列中。
我们希望最大限度地实现并行性,这样就不会有任何微批处理排队,因为我们有足够的可用资源。那么我们如何通过最大限度地利用资源来缩短时间呢。

// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
        getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME",
                sparkServiceConf.getKafkaConsumeParams()));

ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());

JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {
    private static final long serialVersionUID = 1L;
    @Override
    public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception {
        return kafkaRecord.value();
    }
});

    // Decode each binary message and generate JSON array
        JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String call(byte[] asn1Data) throws Exception {
                if(asn1Data.length > 0) {
                    try (InputStream inputStream = new ByteArrayInputStream(asn1Data);
                            Writer writer = new StringWriter(); ) {

                        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
                        GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);

                        byte[] buffer = new byte[1024];
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

                        int len;
                        while((len = gzipInputStream.read(buffer)) != -1) {
                            byteArrayOutputStream.write(buffer, 0, len);
                        }

                        return new String(byteArrayOutputStream.toByteArray());

                    } catch (Exception e) {
//                      
                        producer.flush();

                        throw e;
                    }
                } 

                return null;
            }
        });

// publish generated json gzip to kafka 
        cache.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
                //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
                if(!jsonRdd4DF.isEmpty()) {
                    //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
                    Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);   

                    SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
                    airMainJsonProcessor.processAIRData(json, sparkSession);
                }

            }               
        });

        getJavaStreamingContext().start();
        getJavaStreamingContext().awaitTermination();
        getJavaStreamingContext().stop();

我们使用的技术:

HDFS  2.7.1.2.5 
YARN + MapReduce2  2.7.1.2.5 
ZooKeeper  3.4.6.2.5 
Ambari Infra  0.1.0 
Ambari Metrics  0.1.0 
Kafka  0.10.0.2.5 
Knox  0.9.0.2.5 
Ranger  0.6.0.2.5 
Ranger KMS  0.6.0.2.5 
SmartSense  1.3.0.0-1
Spark2  2.0.x.2.5

我们从不同实验中得到的统计数据:
实验1

num_executors=6
executor_memory=8g
executor_cores=12

100个文件处理时间48分钟
实验2

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100个文件处理时间8分钟
实验3

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100个文件处理时间7分钟
实验4

spark.default.parallelism=16
num_executors=6
executor_memory=8g
executor_cores=12

100个文件处理时间10分钟
请告知,我们如何处理最大的,所以没有排队。

9udxz4iz

9udxz4iz1#

如果没有所有的细节很难判断,但是解决这类问题的一般建议——从非常简单的应用程序开始,“hello world”类。只需读取输入流并将数据打印到日志文件中。一旦这项工作,您就可以证明问题出在应用程序中,并逐步添加您的功能,直到找到罪魁祸首。如果哪怕是最简单的应用程序都无法运行,那么您知道配置或spark cluster本身存在的问题。希望这有帮助。

w46czmvw

w46czmvw2#

我也面临同样的问题,我用scala futures解决了这个问题。
以下是一些显示如何使用它的链接:
https://alvinalexander.com/scala/how-use-multiple-scala-futures-in-for-comprehension-loop
https://www.beyondthelines.net/computing/scala-future-and-execution-context/
另外,这是我使用scala futures时的一段代码:

messages.foreachRDD{ rdd =>
  val f = Future {
  //        sleep(100)
    val newRDD = rdd.map{message => 
                           val req_message = message.value()  
                           (message.value())
                        }

    println("Request messages: " + newRDD.count())         
    var resultrows = newRDD.collect()//.collectAsList() 
    processMessage(resultrows, mlFeatures: MLFeatures, conf)          
    println("Inside scala future")
    1          
  }
  f.onComplete {
    case Success(messages) => println("yay!")
    case Failure(exception) => println("On no!")
  }  
}
guz6ccqo

guz6ccqo3#

我们希望最大限度地实现并行性,这样就不会有任何微批处理排队
这就是流处理的特点:按照接收数据的顺序处理数据。如果您处理数据的速度慢于数据到达的速度,它将被排队。另外,不要期望一条记录的处理会突然在多个节点上并行化。
从你的截图上看,似乎你的批量时间是10秒,而你的制作人在90秒内发布了100张唱片。
处理2条记录需要36秒,处理17条记录需要70秒。很明显,每个批都有一些开销。如果这种依赖关系是线性的,那么只需4:18就可以在一个小批量中处理所有100条记录,从而击败记录持有者。
因为你的代码不完整,所以很难说到底是什么花了那么多时间。代码中的转换看起来不错,但可能操作(或后续转换)才是真正的瓶颈。还有,这是怎么回事 producer.flush() 在你的代码里什么都没提到?

uxhixvfz

uxhixvfz4#

我也面临同样的问题,为了解决这个问题,我做了一些尝试,得出了以下结论:
首先。直觉说每个执行者必须处理一个批,但相反,一次只处理一个批,但作业和任务是并行处理的。
使用spark.streaming.concurrentjobs可以实现多批处理,但是还没有文档记录,还需要一些修复。问题之一是保存Kafka补偿。假设我们将这个参数设置为4,4个批并行处理,那么如果第3批在第4批之前完成,Kafka偏移量将被提交。如果批处理是独立的,则此参数非常有用。

spark.default.parallelism因为它的名字有时被认为是使事物平行的。但它真正的好处在于分布式洗牌操作。尝试不同的数字,并找到一个最佳的数字。你将得到一个相当大的差异,在处理时间。这取决于你工作中的洗牌操作。设置太高会降低性能。你的实验结果也很明显。

另一个选择是使用foreachpartitionasync代替rdd上的foreach。但我认为foreachpartition更好,因为foreachpartitionasync会对作业进行排队,而批处理看起来会被处理,但它们的作业仍然在队列中或正在处理中。可能是我没有正确使用它。但在我的3个服务中表现相同。
fairspark.scheduler.mode必须用于具有大量任务的作业,因为任务的循环分配给作业,使较小的任务有机会在处理较大的任务时开始接收资源。
尝试调整批处理持续时间+输入大小,并始终将其保持在处理持续时间以下,否则您将看到大量的批处理积压。
这些是我的发现和建议,然而,有这么多的配置和方法做流,往往一套操作不适用于其他人。spark streaming就是学习,把你的经验和期望放在一起,得到一组最佳配置。
希望有帮助。如果有人能具体地告诉我们如何合法地并行处理批处理,那将是一种极大的解脱。

相关问题