我正在计算每个窗口中的值,并找到最大值,只想将每个窗口的前10个常用值保存到hdfs,而不是所有值。
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
val counts = eegStreams(a).map(x => (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))
val sortedCounts = counts.map(_.swap).transform(rdd => rdd.sortByKey(false)).map(_.swap)
ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))}
//sortedCounts.foreachRDD(rdd =>println("\nTop 10 amplitudes:\n" + rdd.take(10).mkString("\n")))
sortedCounts.map(tuple => "%s,%s".format(tuple._1, tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))
我可以打印前10名以上(评论)。
我也试过了
sortedCounts.foreachRDD{ rdd => ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/" + (a+1))}
但我得到以下错误。我的数组不可序列化
15/01/05 17:12:23错误actor.oneforonestrategy:org.apache.spark.streaming.streamingcontext java.io.notserializableeexception:org.apache.spark.streaming.streamingcontext
2条答案
按热度按时间j8yoct9x1#
你能试试这个吗?
注意:我没有测试代码段。。。
bqucvtff2#
你的第一个版本应该有用。申报就行了
@transient ssc = ...
首先创建流式处理上下文的位置。第二个版本不起作用
StreamingContext
无法在闭包中序列化。