如何在启动SparkStreaming进程时加载历史数据,并计算正在运行的聚合

ogq8wdun  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(420)

我的elasticsearch集群中有一些与销售相关的json数据,我想使用spark streaming(使用spark 1.4.1)通过kafka动态聚合来自我的电子商务网站的传入销售事件,以查看用户的总销售额(就收入和产品而言)。
从我阅读的文档中,我还不清楚的是如何在spark应用程序启动时从elasticsearch加载历史数据,以及如何计算每个用户的总收入(基于历史和来自kafka的收入)。
我有以下(工作)代码连接到我的kafka示例并接收json文档:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

object ReadFromKafka {
  def main(args: Array[String]) {

    val checkpointDirectory = "/tmp"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
    val topicsSet = Array("tracking").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Create SQLContect and parse JSON
        val sqlContext = new SQLContext(sc)
        val trackingEvents = sqlContext.read.json(rdd.values)

        //Sample aggregation of incoming data
        trackingEvents.groupBy("type").count().show()

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

我知道elasticsearch有个插件(https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read),但我还不清楚如何集成启动时读取和流计算过程,以将历史数据与流数据聚合。
我很感激你的帮助!提前谢谢。

v1l68za4

v1l68za41#

RDD是不可变的,因此在它们被创建之后,您不能向它们添加数据,例如用新事件更新收入。
您可以做的是将现有数据与新事件合并,以创建一个新的rdd,然后将其用作当前总数。例如。。。

var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
    currentTotal = currentTotal.union(rdd)
}

在这种情况下,我们 currentTotalvar 因为当它与传入数据联合时,它将被对新rdd的引用所取代。
在并集之后,您可能希望执行一些进一步的操作,例如减少属于同一个键的值,但是您得到了这样的结果。
如果您使用这种技术,请注意您的rdd的沿袭将会增长,因为每个新创建的rdd都将引用它的父rdd。这可能会导致堆栈溢出样式沿袭问题。要解决这个问题,你可以打电话 checkpoint() 定期在rdd上。

相关问题