我有一个文件,里面有这样的元素列表
00|905000|20160125204123|79644809999||HGMTC|1||22|7905000|56321647569|||34110|I||||||250995210056537|354805064211510||56191|||38704||A|||11|V|81079681404134|5||||SE|||G|144|||||||||||||||Y|b00534589.huawei_anadyr.20151231184912||1|||||79681404134|0|||+@@+1{79098509982}2{2}3{2}5{79644809999}6{0000002A7A5AC635}7{79681404134}|20160125|
通过一系列的步骤,我成功地将它转换成这样的元素列表
(902996760100000,CompactBuffer(6, 5, 2, 2, 8, 6, 5, 3))
其中905000和902996760100000是键,6、5、2、2、8、6、5、3是值。值可以是1到8之间的数字。有没有办法用spark来计算这些值的出现次数,结果是这样的?
(902996760100000, 0_1, 2_2, 1_3, 0_4, 2_5, 2_6, 0_7, 1_8)
我可以用if-else块和工作人员来完成,但那不太好,所以我想知道在scala/spark中是否有我可以使用的工具。
这是我的密码。
class ScalaJob(sc: SparkContext) {
def run(cdrPath: String) : RDD[(String, Iterable[String])] = {
//pass the file
val fileCdr = sc.textFile(cdrPath);
//find values in every raw cdr
val valuesCdr = fileCdr.map{
dataRaw =>
val p = dataRaw.split("[|]",-1)
(p(1), ScalaJob.processType(ScalaJob.processTime(p(2)) + "_" + p(32)))
}
val x = valuesCdr.groupByKey()
return x
}
任何关于优化的建议都将不胜感激。我对scala/spark真的很陌生。
1条答案
按热度按时间iqxoj9l91#
首先,scala是一种类型安全的语言,spark的RDDAPI也是如此——因此强烈建议使用类型系统,而不是将所有内容“编码”为字符串。
所以我建议一个解决方案
RDD[(String, Seq[(Int, Int)])]
(元组中的第二项是(id,count)元组序列)而不是RDD[(String, Iterable[String])]
似乎没那么有用。下面是一个简单的函数,它计算给定时间段中1到8的出现次数
Iterable[Int]
:你可以用
mapValues
在一个RDD[(String, Iterable[Int])]
要获得结果:整个解决方案可以简化一点: