我有一个Spark Streaming应用程序,可以从多个Kafka主题中读取数据。每个主题都有不同类型的数据,因此需要不同的处理管道。
我最初的解决方案是为每个主题创建一个DStream:
def main(args: Array[String]) {
val streamingContext: StreamingContext = ...
val topics = ...
for (topic <- topics) {
val offsets: Map[TopicAndPartition, Long] = ...
val stream = KafkaUtils.createDirectStream[...](streamingContext, kafkaProperties, offsets, ...)
configureStream(topic, stream)
}
streamingContext.addStreamingListener(new StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
// logic to save offsets after each batch completes
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
def configureStream(topic: String, stream: DStream[...]) {
topic match {
case "first" => stream.map(...).foreachRDD(...)
case "second" => stream.map(...).foreachRDD(...)
case "third" => stream.map(...).foreachRDD(...)
// ...
}
}
当运行应用程序时,处理作业是一个接一个地计算的,即使它们最初属于不同的DStream。
我试着调整spark.streaming.concurrentJobs
参数(as stated here),但这就是事情变得奇怪的时候:
- 第一批是处理更多的数据(因为当流应用程序关闭时,数据会在Kafka中积累)。处理时间长于分配的批处理间隔。
- 第二个批次被添加到队列中(第一个批次仍在运行),并立即开始处理。
- 第二批(有时甚至是第三批)在第一批之前就完成了。
这可能会导致问题,例如在管理Kafka偏移量时-流侦听器首先获取第二/第三批的偏移量(因为它首先完成)并保存它们。如果应用程序在完成第一批处理之前崩溃,则数据将丢失。在另一种情况下,如果第一批处理完成并且应用程序随后崩溃,则第二/第三批处理中的数据将被重放。
有没有一种方法可以让Spark在不处理新批的情况下并行处理作业?或者,也许,并行处理不同的DStream(即,一个DStream中的作业被线性处理;跨不同的DStreams并行)?
1条答案
按热度按时间oug3syen1#
这是不可能的Dstream。
Spark结构化流媒体解决了这个问题。
你可以结帐this回答更多信息。