dstream[double]到dstream scala

yebdmbv4  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(210)

我正在开发spark consumer应用程序,它正在使用来自kafka broker的消息,我想找到spark consumer收到的消息的平均值,最后我想将平均值存储到cassandra中。

val Array(brokers, topics) = args

val sparkConf = new SparkConf().setAppName("MyDirectKafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(20))

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

val lines = messages.map(_._2)
val count = lines.count()
count.print()
val total = lines.map(JSON.parseFull(_)
    .asInstanceOf[Option[List[Map[String,List[Map[String,Double]]]]]]
    .map(_(1)("MeterData").map(_("mVolts1"))).getOrElse(List())).flatMap(list => list).reduce((x,y) => x+y)
total.print()

val avg = total.reduce((total,count) => total / count )
avg.print()
ssc.start()
ssc.awaitTermination()

在上面的代码中,我得到的是total和count,这正是我所期望的,但是我无法计算平均值,因为count是dstream[long],total是dstream[double]。
我觉得这条线有点问题。”val avg=total.reduce((total,count)=>total/count)“感谢您的帮助。
output:count:这是作为dstream[long]total在流中获得的计数输出:这是作为dstream[double]在同一流中获得的总输出

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题