scala spark按键还原并找到公共值

bvn4nwqk  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(310)

我有一个csv数据文件作为一个序列文件存储在hdfs中,格式为 name, zip, country, fav_food1, fav_food2, fav_food3, fav_colour . 可能有许多同名的条目,我需要找出他们最喜欢的食物是什么(即计算所有记录中所有同名的食物条目,然后返回最受欢迎的条目)。我是scala和spark的新手,已经完成了多个教程并浏览了论坛,但是对于如何继续,我很困惑。到目前为止,我已经得到了序列文件,它将文本转换成字符串格式,然后过滤掉条目
下面是文件中一行的示例数据项

Bob,123,USA,Pizza,Soda,,Blue
Bob,456,UK,Chocolate,Cheese,Soda,Green
Bob,12,USA,Chocolate,Pizza,Soda,Yellow
Mary,68,USA,Chips,Pasta,Chocolate,Blue

所以输出应该是元组(bob,soda),因为soda在bob的条目中出现的次数最多。

import org.apache.hadoop.io._

var lines  = sc.sequenceFile("path",classOf[LongWritable],classOf[Text]).values.map(x => x.toString())
// converted to string since I could not get filter to run on Text and removing the longwritable

var filtered = lines.filter(_.split(",")(0) == "Bob");
// removed entries with all other users

var f_tuples = filtered.map(line => lines.split(",");
// split all the values

var f_simple = filtered.map(line => (line(0), (line(3), line(4), line(5))
// removed unnecessary fields

我现在的问题是,我想我有这个 [<name,[f,f,f]>] 但我不知道如何把它变平,得到最受欢迎的食物。我需要合并所有条目,这样我就有了一个带有a的条目,然后得到值中最常见的元素。任何帮助都将不胜感激。谢谢
我试着让它变平,但似乎我越试,数据结构变得越复杂。

var f_trial = fpairs.groupBy(_._1).mapValues(_.map(_._2))
// the resulting structure was of type org.apache.spark.rdd.RDD[(String, Interable[(String, String, String)]

这是一张打印出来的唱片在f\ U审判后的样子

("Bob", List((Pizza, Soda,), (Chocolate, Cheese, Soda), (Chocolate, Pizza, Soda)))

括号分解

("Bob", 

List(

(Pizza, Soda, <missing value>),

(Chocolate, Cheese, Soda),

(Chocolate, Pizza, Soda)

) // ends List paren

) // ends first paren
cetgtptt

cetgtptt1#

paul和mattinbits提供的解决方案将数据洗牌两次—一次按名称和食物执行reduce,一次按名称执行reduce。只用一次洗牌就可以解决这个问题。

/**Generate key-food_count pairs from a splitted line**/
def bitsToKeyMapPair(xs: Array[String]): (String, Map[String, Long]) = {
  val key = xs(0)
  val map = xs
    .drop(3) // Drop name..country
    .take(3) // Take food
    .filter(_.trim.size !=0) // Ignore empty
    .map((_, 1L)) // Generate k-v pairs
    .toMap // Convert to Map
    .withDefaultValue(0L) // Set default

  (key, map)
}

/**Combine two count maps**/
def combine(m1: Map[String, Long], m2: Map[String, Long]): Map[String, Long] = {
  (m1.keys ++ m2.keys).map(k => (k, m1(k) + m2(k))).toMap.withDefaultValue(0L)
}

val n: Int = ??? // Number of favorite per user

val records = lines.map(line => bitsToKeyMapPair(line.split(",")))
records.reduceByKey(combine).mapValues(_.toSeq.sortBy(-_._2).take(n))

如果你不是一个纯粹主义者,你可以取代 scala.collection.immutable.Mapscala.collection.mutable.Map 进一步提高绩效。

eyh26e7m

eyh26e7m2#

下面是一个完整的示例:

import org.apache.spark.{SparkContext, SparkConf}

object Main extends App {

  val data = List(
    "Bob,123,USA,Pizza,Soda,,Blue",
    "Bob,456,UK,Chocolate,Cheese,Soda,Green",
    "Bob,12,USA,Chocolate,Pizza,Soda,Yellow",
    "Mary,68,USA,Chips,Pasta,Chocolate,Blue")

  val sparkConf = new SparkConf().setMaster("local").setAppName("example")
  val sc = new SparkContext(sparkConf)

  val lineRDD = sc.parallelize(data)

  val pairedRDD = lineRDD.map { line =>
    val fields = line.split(",")
    (fields(0), List(fields(3), fields(4), fields(5)).filter(_ != ""))
  }.filter(_._1 == "Bob")

  /*pairedRDD.collect().foreach(println)
    (Bob,List(Pizza, Soda))
    (Bob,List(Chocolate, Cheese, Soda))
    (Bob,List(Chocolate, Pizza, Soda))
   */

  val flatPairsRDD = pairedRDD.flatMap {
    case (name, foodList) => foodList.map(food => ((name, food), 1))
  }

  /*flatPairsRDD.collect().foreach(println)
    ((Bob,Pizza),1)
    ((Bob,Soda),1)
    ((Bob,Chocolate),1)
    ((Bob,Cheese),1)
    ((Bob,Soda),1)
    ((Bob,Chocolate),1)
    ((Bob,Pizza),1)
    ((Bob,Soda),1)
   */

  val nameFoodSumRDD = flatPairsRDD.reduceByKey((a, b) => a + b)

  /*nameFoodSumRDD.collect().foreach(println)
    ((Bob,Cheese),1)
    ((Bob,Soda),3)
    ((Bob,Pizza),2)
    ((Bob,Chocolate),2)
   */

  val resultsRDD = nameFoodSumRDD.map{
    case ((name, food), count) => (name, (food,count))
  }.groupByKey.map{
    case (name, foodCountList) => (name, foodCountList.toList.sortBy(_._2).reverse.head)
  }

  resultsRDD.collect().foreach(println)
  /*
      (Bob,(Soda,3))
   */

  sc.stop()
}
ztmd8pv5

ztmd8pv53#

我找到时间了。设置:

import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf

    val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
    val sc = new SparkContext(conf)

    val data = """   
  Bob,123,USA,Pizza,Soda,,Blue
  Bob,456,UK,Chocolate,Cheese,Soda,Green
  Bob,12,USA,Chocolate,Pizza,Soda,Yellow
  Mary,68,USA,Chips,Pasta,Chocolate,Blue
  """.trim

    val records = sc.parallelize(data.split('\n'))

提取食物的选择,并为每一个做一个元组 ((name, food), 1) ```
val r2 = records.flatMap { r =>
val Array(name, id, country, food1, food2, food3, color) = r.split(',');
List(((name, food1), 1), ((name, food2), 1), ((name, food3), 1))
}

每个名字/食物组合总计:

val r3 = r2.reduceByKey((x, y) => x + y)

重新Map,使名称(仅)成为键

val r4 = r3.map { case ((name, food), total) => (name, (food, total)) }

在每一步挑选数量最多的食物

val res = r4.reduceByKey((x, y) => if (y._2 > x._2) y else x)

我们结束了

println(res.collect().mkString)
//(Mary,(Chips,1))(Bob,(Soda,3))

编辑:要收集一个人拥有相同最高计数的所有食物,我们只需更改最后两行:
从总计为的项目列表开始:

val r5 = r3.map { case ((name, food), total) => (name, (List(food), total)) }

在同等情况下,将食物列表与该分数连接起来

val res2 = r5.reduceByKey((x, y) => if (y._2 > x._2) y
else if (y._2 < x._2) x
else (y._1:::x._1, y._2))

//(Mary,(List(Chocolate, Pasta, Chips),1))
//(Bob,(List(Soda),3))

如果你想要前三名,那么就用 `aggregateByKey` 列出每个人最喜欢的食物而不是第二种 `reduceByKey` 

相关问题