我有一个csv数据文件作为一个序列文件存储在hdfs中,格式为 name, zip, country, fav_food1, fav_food2, fav_food3, fav_colour
. 可能有许多同名的条目,我需要找出他们最喜欢的食物是什么(即计算所有记录中所有同名的食物条目,然后返回最受欢迎的条目)。我是scala和spark的新手,已经完成了多个教程并浏览了论坛,但是对于如何继续,我很困惑。到目前为止,我已经得到了序列文件,它将文本转换成字符串格式,然后过滤掉条目
下面是文件中一行的示例数据项
Bob,123,USA,Pizza,Soda,,Blue
Bob,456,UK,Chocolate,Cheese,Soda,Green
Bob,12,USA,Chocolate,Pizza,Soda,Yellow
Mary,68,USA,Chips,Pasta,Chocolate,Blue
所以输出应该是元组(bob,soda),因为soda在bob的条目中出现的次数最多。
import org.apache.hadoop.io._
var lines = sc.sequenceFile("path",classOf[LongWritable],classOf[Text]).values.map(x => x.toString())
// converted to string since I could not get filter to run on Text and removing the longwritable
var filtered = lines.filter(_.split(",")(0) == "Bob");
// removed entries with all other users
var f_tuples = filtered.map(line => lines.split(",");
// split all the values
var f_simple = filtered.map(line => (line(0), (line(3), line(4), line(5))
// removed unnecessary fields
我现在的问题是,我想我有这个 [<name,[f,f,f]>]
但我不知道如何把它变平,得到最受欢迎的食物。我需要合并所有条目,这样我就有了一个带有a的条目,然后得到值中最常见的元素。任何帮助都将不胜感激。谢谢
我试着让它变平,但似乎我越试,数据结构变得越复杂。
var f_trial = fpairs.groupBy(_._1).mapValues(_.map(_._2))
// the resulting structure was of type org.apache.spark.rdd.RDD[(String, Interable[(String, String, String)]
这是一张打印出来的唱片在f\ U审判后的样子
("Bob", List((Pizza, Soda,), (Chocolate, Cheese, Soda), (Chocolate, Pizza, Soda)))
括号分解
("Bob",
List(
(Pizza, Soda, <missing value>),
(Chocolate, Cheese, Soda),
(Chocolate, Pizza, Soda)
) // ends List paren
) // ends first paren
3条答案
按热度按时间cetgtptt1#
paul和mattinbits提供的解决方案将数据洗牌两次—一次按名称和食物执行reduce,一次按名称执行reduce。只用一次洗牌就可以解决这个问题。
如果你不是一个纯粹主义者,你可以取代
scala.collection.immutable.Map
与scala.collection.mutable.Map
进一步提高绩效。eyh26e7m2#
下面是一个完整的示例:
ztmd8pv53#
我找到时间了。设置:
提取食物的选择,并为每一个做一个元组
((name, food), 1)
```val r2 = records.flatMap { r =>
val Array(name, id, country, food1, food2, food3, color) = r.split(',');
List(((name, food1), 1), ((name, food2), 1), ((name, food3), 1))
}
val r3 = r2.reduceByKey((x, y) => x + y)
val r4 = r3.map { case ((name, food), total) => (name, (food, total)) }
val res = r4.reduceByKey((x, y) => if (y._2 > x._2) y else x)
println(res.collect().mkString)
//(Mary,(Chips,1))(Bob,(Soda,3))
val r5 = r3.map { case ((name, food), total) => (name, (List(food), total)) }
val res2 = r5.reduceByKey((x, y) => if (y._2 > x._2) y
else if (y._2 < x._2) x
else (y._1:::x._1, y._2))
//(Mary,(List(Chocolate, Pasta, Chips),1))
//(Bob,(List(Soda),3))