scala 并行运行不同DStreams的多个Spark Streaming作业

vaj7vani  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(119)

我有一个Spark Streaming应用程序,可以从多个Kafka主题中读取数据。每个主题都有不同类型的数据,因此需要不同的处理管道。
我最初的解决方案是为每个主题创建一个DStream:

def main(args: Array[String]) { 
    val streamingContext: StreamingContext = ...
    val topics = ...

    for (topic <- topics) {
        val offsets: Map[TopicAndPartition, Long] = ...
        val stream = KafkaUtils.createDirectStream[...](streamingContext, kafkaProperties, offsets, ...)
        configureStream(topic, stream)
    }

    streamingContext.addStreamingListener(new StreamingListener {
        override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
            // logic to save offsets after each batch completes
        }
    })

    streamingContext.start()
    streamingContext.awaitTermination()
}

def configureStream(topic: String, stream: DStream[...]) {
    topic match {
        case "first" => stream.map(...).foreachRDD(...)
        case "second" => stream.map(...).foreachRDD(...)
        case "third" => stream.map(...).foreachRDD(...)
        // ...
    }
}

当运行应用程序时,处理作业是一个接一个地计算的,即使它们最初属于不同的DStream。
我试着调整spark.streaming.concurrentJobs参数(as stated here),但这就是事情变得奇怪的时候:

  • 第一批是处理更多的数据(因为当流应用程序关闭时,数据会在Kafka中积累)。处理时间长于分配的批处理间隔。
  • 第二个批次被添加到队列中(第一个批次仍在运行),并立即开始处理。
  • 第二批(有时甚至是第三批)在第一批之前就完成了。

这可能会导致问题,例如在管理Kafka偏移量时-流侦听器首先获取第二/第三批的偏移量(因为它首先完成)并保存它们。如果应用程序在完成第一批处理之前崩溃,则数据将丢失。在另一种情况下,如果第一批处理完成并且应用程序随后崩溃,则第二/第三批处理中的数据将被重放。
有没有一种方法可以让Spark在不处理新批的情况下并行处理作业?或者,也许,并行处理不同的DStream(即,一个DStream中的作业被线性处理;跨不同的DStreams并行)?

oug3syen

oug3syen1#

这是不可能的Dstream。
Spark结构化流媒体解决了这个问题。
你可以结帐this回答更多信息。

相关问题