best-mapreduce算法计算每个重叠区间的个数

tv6aics1  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(420)

格式上有几十亿个间隔 [a, b] ,它们都将数字空间分割成多个单独的片段。我打算用这个片段中重叠间隔的数量输出所有单个片段。
例如:有3个区间,分别是:[1,7],[2,3],[6,8]。输出结果如下:
[-∞, 1]: 0
[1, 2]: 1
[2, 3]: 2
[3, 6]: 1
[6, 7]: 2
[7, 8]: 1
[8, +∞]: 0
如果对于一台机器(不是mapreduce中的分布式解决方案),我知道可以将interval示例分解为 start_n , end_n ,按数字排序,从左到右迭代,并使用计数器计算当前工件和输出中的数量。但我不确定这个算法如何被分解成分布式的方式。
有什么建议吗?谢谢。

ohtdti5x

ohtdti5x1#

下面是一个有效的spark代码(至少在您的示例中它给出了正确的结果:
由于有两个笛卡尔积,代码效率不高。
此外,间隔比较的条件可能需要注意:)
请随意改进代码,并在这里张贴您的改进答案。

import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("")
  val sc = new SparkContext(conf)

  case class Interval(start : Double, end : Double)

  def main(args: Array[String]): Unit = {

sc.setLogLevel("ERROR")

val input = List(Interval(1, 7), Interval(2, 3), Interval(6, 8))
val infinities = List(Double.NegativeInfinity, Double.PositiveInfinity)
val inputRdd = sc.parallelize(input)
val infinitiesRdd = sc.parallelize(infinities)

// Get unique flat boundary values  e.g.: Interval(1, 7) will give 2 boundary values: [1, 7]
val boundaries = inputRdd.flatMap(v => List(v.start, v.end)).distinct()
// Additionally we will need negative and positive infinities
val all_boundaries = boundaries.union(infinitiesRdd)

// Calculate all intervals
val intervals = all_boundaries
  // For each interval start get all possible interval ends
  .cartesian(all_boundaries)    // [(1, 2), (1, 3), (1, 6), (2, 1), ...]
  // Filter out invalid intervals (where begin is either less or equal to the end)  e.g.: from previous comment (2, 1) is invalid interval
  .filter(v => v._1 < v._2)     // [(1, 2), (1, 3), (1, 6), (2, 3), ...]
  // Find lesser interval end e.g.: in previous comment (1, 2) -> 2 is smallest value for the same start (1)
  .reduceByKey((a, b) => Math.min(a, b))  // [(1, 2) (2, 3), ...]

// Uncommend this to print intermediate result
// intervals.sortBy(_._1).collect().foreach(println)

// Get counts of overlapping intervals
val countsPerInterval = intervals
  // for each small interval get all possible intput intervals e.g.:
  .cartesian(inputRdd)    // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
  // Filter out intervals that do not overlap
  .filter{ case (smallInterval, inputInterval) => inputInterval.start <= smallInterval._1 && inputInterval.end >= smallInterval._2}   // [((1, 2), Interval(1, 7)), ((1, 2), Interval(2, 3)), ...]
  // Since we're not interested in intervals, but only in count of intervals -> change interval to 1 for reduction
  .mapValues(_ => 1)      //[((1, 2), 1), ((1, 2), 1), ...]
  // Calculate a sum per interval
  .reduceByKey(_ + _)   // [((1, 2), 2), ...]

// print result
countsPerInterval.sortBy(_._1).collect().foreach(println)
  }

}
t98cgbkg

t98cgbkg2#

在mapreduce中,最简单的方法是将对中的每个数字都写入reducer。排序洗牌阶段负责对数字进行排序,而reducer负责修复。
e、 g.对于输入对 [1,7] Map器输出为:

key: NullWritable  Value: 1
key: NullWritable  Value: 7
key: NullWritable  Value: 1_7

对于相同的模式,所有Map器的输出形式为:

key: NullWritable  Value: 1
key: NullWritable  Value: 7
key: NullWritable  Value: 1_7
key: NullWritable  Value: 2
key: NullWritable  Value: 3
key: NullWritable  Value: 2_3
key: NullWritable  Value: 6
key: NullWritable  Value: 8
key: NullWritable  Value: 6_8

sort shuffle步骤将输出聚合为

Key: NullWritable  ListOfValue: [1,1_7,2,2_3,3,6,6_8,7,8]

reducer遍历值列表(这将是一个有序列表)并
将成对值分隔到单独的列表中 [1_7, 2_3, 6_8] . 你可以检查一下 _ 在课文中找出一对。
按如下所示重新配对空间值。 [-infinity, 1] [1, 2] [2, 3] [3, 6] [6, 7] [7, 8] [8, +infinity] 重新配对时,只需对照上面的列表检查边界即可找到计数。您可以用“\”分开这对,然后通过转换成数字 parse 功能。
e、 g.-无穷大(比如一个非常大的负长-9999999)超出了所有对的范围,因此减速机的输出将是 key: “[-无穷大,1]”( Text 类型) value: 0 ( intwritable(类型)
成对的也一样 [1,2] , 1>=1 and 2<=7 减速机输出 key: "[1, 2]" ( Text 类型) value: 1 ( intwritable(类型)
成对的 [6,7] , 6>=1 and 7<=7 以及 6>=6 and 7<=8 减速机输出 key: "[1, 2]" ( Text 类型) value: 2 ( intwritable(类型)
等等。。。
注: NullWritable 是一个 Java hadoop API ,表示 null . 而不是 NullWritable ,可以使用任何常量数据(例如 Hadoop Text 类型 Writable ). 这里的要点是确保所有Map器输出都应该由于相同的Map器键而落在单个reducer上。

相关问题