如何用scala/spark计算元素的出现次数?

ltskdhd1  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(423)

我有一个文件,里面有这样的元素列表

00|905000|20160125204123|79644809999||HGMTC|1||22|7905000|56321647569|||34110|I||||||250995210056537|354805064211510||56191|||38704||A|||11|V|81079681404134|5||||SE|||G|144|||||||||||||||Y|b00534589.huawei_anadyr.20151231184912||1|||||79681404134|0|||+@@+1{79098509982}2{2}3{2}5{79644809999}6{0000002A7A5AC635}7{79681404134}|20160125|

通过一系列的步骤,我成功地将它转换成这样的元素列表

(902996760100000,CompactBuffer(6, 5, 2, 2, 8, 6, 5, 3))

其中905000和902996760100000是键,6、5、2、2、8、6、5、3是值。值可以是1到8之间的数字。有没有办法用spark来计算这些值的出现次数,结果是这样的?

(902996760100000, 0_1, 2_2, 1_3, 0_4, 2_5, 2_6, 0_7, 1_8)

我可以用if-else块和工作人员来完成,但那不太好,所以我想知道在scala/spark中是否有我可以使用的工具。
这是我的密码。

class ScalaJob(sc: SparkContext) {
  def run(cdrPath: String) : RDD[(String, Iterable[String])] = {
    //pass the file
    val fileCdr = sc.textFile(cdrPath);

    //find values in every raw cdr
    val valuesCdr = fileCdr.map{
      dataRaw =>
      val p = dataRaw.split("[|]",-1)
      (p(1), ScalaJob.processType(ScalaJob.processTime(p(2)) + "_" + p(32)))
    }
    val x = valuesCdr.groupByKey()
    return x
  }

任何关于优化的建议都将不胜感激。我对scala/spark真的很陌生。

iqxoj9l9

iqxoj9l91#

首先,scala是一种类型安全的语言,spark的RDDAPI也是如此——因此强烈建议使用类型系统,而不是将所有内容“编码”为字符串。
所以我建议一个解决方案 RDD[(String, Seq[(Int, Int)])] (元组中的第二项是(id,count)元组序列)而不是 RDD[(String, Iterable[String])] 似乎没那么有用。
下面是一个简单的函数,它计算给定时间段中1到8的出现次数 Iterable[Int] :

def countValues(l: Iterable[Int]): Seq[(Int, Int)] = {
  (1 to 8).map(i => (i, l.count(_ == i)))
}

你可以用 mapValues 在一个 RDD[(String, Iterable[Int])] 要获得结果:

valuesCdr.groupByKey().mapValues(ScalaJob.countValues)

整个解决方案可以简化一点:

class ScalaJob(sc: SparkContext) {
  import ScalaJob._

  def run(cdrPath: String): RDD[(String, Seq[(Int, Int)])] = {
    val valuesCdr = sc.textFile(cdrPath)
      .map(_.split("\\|"))
      .map(p => (p(1), processType(processTime(p(2)), p(32))))

    valuesCdr.groupByKey().mapValues(countValues)
  }  
}

object ScalaJob {
  val dayParts = Map((6 to 11) -> 1, (12 to 18) -> 2, (19 to 23) -> 3, (0 to 5) -> 4)

  def processTime(s: String): Int = {
    val hour = DateTime.parse(s, DateTimeFormat.forPattern("yyyyMMddHHmmss")).getHourOfDay
    dayParts.filterKeys(_.contains(hour)).values.head
  }

  def processType(dayPart: Int, s: String): Int = s match {
    case "S" => 2 * dayPart - 1
    case "V" => 2 * dayPart
  }

  def countValues(l: Iterable[Int]): Seq[(Int, Int)] = {
    (1 to 8).map(i => (i, l.count(_ == i)))
  }
}

相关问题