如何在scala中使用mappartitions?

x759pob2  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(544)

我想用 mapPartitions 但得到以下错误。

[error]  found   : Unit
[error]  required: Iterator[?]
[error] Error occurred in an application involving default arguments.
[error]         rdd.mapPartitions(showParts)

我称之为 mapPartitions 功能如下。

rdd.mapPartitions(showParts)

哪里 showParts 函数定义如下。

def showParts(iter: Iterator[(Long, Array[String])]) = 
{ 
  while (iter.hasNext)
  {
    val cur = iter.next;
    // Do something with cur
  }
}

正确的使用方法是什么 mapPartitions 在这里?

vmjh9lq9

vmjh9lq91#

你需要归还一张支票 Iterator 从你的 showParts 功能。

def onlyEven(numbers: Iterator[Int]) : Iterator[Int] = 
  numbers.filter(_ % 2 == 0)

def partitionSize(numbers: Iterator[Int]) : Iterator[Int] = 
  Iterator.single(numbers.length)

val rdd = sc.parallelize(0 to 10)
rdd.mapPartitions(onlyEven).collect()
// Array[Int] = Array(0, 2, 4, 6, 8, 10)

rdd.mapPartitions(size).collect()
// Array[Int] = Array(2, 3, 3, 3)
vuktfyat

vuktfyat2#

问题是你传递给的自定义项 mapPartitions 必须具有返回类型 Iterator[U] . 您当前的代码不返回任何内容,因此属于类型 Unit .
如果你想得到一个空的 RDD 在执行 mapPartitions 然后可以执行以下操作:

def showParts(iter: Iterator[(Long, Array[String])]) = 
{ 
  while (iter.hasNext)
  {
    val cur = iter.next;
    // Do something with cur
  }

  // return Iterator[U]
  Iterator.empty 
}

相关问题