我更喜欢Spark;我正在研究spark streaming用例,在这个用例中,我收到一条json消息,每条json消息都有一个属性'value',在解析json之后,这个属性是double,我得到一个数组[double]。这是我的密码。
val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)
val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
.window(Seconds(15),Seconds(2))
valueDtsream.foreachRDD
{
rdd =>
if (!rdd.partitions.isEmpty)
{
//code to find min and max
}
}
ssc.start()
ssc.awaitTermination()
1条答案
按热度按时间s4chpxco1#
尝试: