我们正在接收来自Kafka的Spark流数据。一旦在spark streaming中开始执行,它只执行一个批,其余批开始在kafka中排队。
我们的数据是独立的,可以并行处理。
我们尝试了多种配置,包括多个执行器、内核、背压和其他配置,但到目前为止没有任何效果。队列中有许多消息,一次只处理了一个微批处理,其余的仍保留在队列中。
我们希望最大限度地实现并行性,这样就不会有任何微批处理排队,因为我们有足够的可用资源。那么我们如何通过最大限度地利用资源来缩短时间呢。
// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME",
sparkServiceConf.getKafkaConsumeParams()));
ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());
JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {
private static final long serialVersionUID = 1L;
@Override
public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception {
return kafkaRecord.value();
}
});
// Decode each binary message and generate JSON array
JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(byte[] asn1Data) throws Exception {
if(asn1Data.length > 0) {
try (InputStream inputStream = new ByteArrayInputStream(asn1Data);
Writer writer = new StringWriter(); ) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
byte[] buffer = new byte[1024];
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int len;
while((len = gzipInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, len);
}
return new String(byteArrayOutputStream.toByteArray());
} catch (Exception e) {
//
producer.flush();
throw e;
}
}
return null;
}
});
// publish generated json gzip to kafka
cache.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
//Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
if(!jsonRdd4DF.isEmpty()) {
//JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
airMainJsonProcessor.processAIRData(json, sparkSession);
}
}
});
getJavaStreamingContext().start();
getJavaStreamingContext().awaitTermination();
getJavaStreamingContext().stop();
我们使用的技术:
HDFS 2.7.1.2.5
YARN + MapReduce2 2.7.1.2.5
ZooKeeper 3.4.6.2.5
Ambari Infra 0.1.0
Ambari Metrics 0.1.0
Kafka 0.10.0.2.5
Knox 0.9.0.2.5
Ranger 0.6.0.2.5
Ranger KMS 0.6.0.2.5
SmartSense 1.3.0.0-1
Spark2 2.0.x.2.5
我们从不同实验中得到的统计数据:
实验1
num_executors=6
executor_memory=8g
executor_cores=12
100个文件处理时间48分钟
实验2
spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12
100个文件处理时间8分钟
实验3
spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12
100个文件处理时间7分钟
实验4
spark.default.parallelism=16
num_executors=6
executor_memory=8g
executor_cores=12
100个文件处理时间10分钟
请告知,我们如何处理最大的,所以没有排队。
4条答案
按热度按时间9udxz4iz1#
如果没有所有的细节很难判断,但是解决这类问题的一般建议——从非常简单的应用程序开始,“hello world”类。只需读取输入流并将数据打印到日志文件中。一旦这项工作,您就可以证明问题出在应用程序中,并逐步添加您的功能,直到找到罪魁祸首。如果哪怕是最简单的应用程序都无法运行,那么您知道配置或spark cluster本身存在的问题。希望这有帮助。
w46czmvw2#
我也面临同样的问题,我用scala futures解决了这个问题。
以下是一些显示如何使用它的链接:
https://alvinalexander.com/scala/how-use-multiple-scala-futures-in-for-comprehension-loop
https://www.beyondthelines.net/computing/scala-future-and-execution-context/
另外,这是我使用scala futures时的一段代码:
guz6ccqo3#
我们希望最大限度地实现并行性,这样就不会有任何微批处理排队
这就是流处理的特点:按照接收数据的顺序处理数据。如果您处理数据的速度慢于数据到达的速度,它将被排队。另外,不要期望一条记录的处理会突然在多个节点上并行化。
从你的截图上看,似乎你的批量时间是10秒,而你的制作人在90秒内发布了100张唱片。
处理2条记录需要36秒,处理17条记录需要70秒。很明显,每个批都有一些开销。如果这种依赖关系是线性的,那么只需4:18就可以在一个小批量中处理所有100条记录,从而击败记录持有者。
因为你的代码不完整,所以很难说到底是什么花了那么多时间。代码中的转换看起来不错,但可能操作(或后续转换)才是真正的瓶颈。还有,这是怎么回事
producer.flush()
在你的代码里什么都没提到?uxhixvfz4#
我也面临同样的问题,为了解决这个问题,我做了一些尝试,得出了以下结论:
首先。直觉说每个执行者必须处理一个批,但相反,一次只处理一个批,但作业和任务是并行处理的。
使用spark.streaming.concurrentjobs可以实现多批处理,但是还没有文档记录,还需要一些修复。问题之一是保存Kafka补偿。假设我们将这个参数设置为4,4个批并行处理,那么如果第3批在第4批之前完成,Kafka偏移量将被提交。如果批处理是独立的,则此参数非常有用。
spark.default.parallelism因为它的名字有时被认为是使事物平行的。但它真正的好处在于分布式洗牌操作。尝试不同的数字,并找到一个最佳的数字。你将得到一个相当大的差异,在处理时间。这取决于你工作中的洗牌操作。设置太高会降低性能。你的实验结果也很明显。
另一个选择是使用foreachpartitionasync代替rdd上的foreach。但我认为foreachpartition更好,因为foreachpartitionasync会对作业进行排队,而批处理看起来会被处理,但它们的作业仍然在队列中或正在处理中。可能是我没有正确使用它。但在我的3个服务中表现相同。
fairspark.scheduler.mode必须用于具有大量任务的作业,因为任务的循环分配给作业,使较小的任务有机会在处理较大的任务时开始接收资源。
尝试调整批处理持续时间+输入大小,并始终将其保持在处理持续时间以下,否则您将看到大量的批处理积压。
这些是我的发现和建议,然而,有这么多的配置和方法做流,往往一套操作不适用于其他人。spark streaming就是学习,把你的经验和期望放在一起,得到一组最佳配置。
希望有帮助。如果有人能具体地告诉我们如何合法地并行处理批处理,那将是一种极大的解脱。