合并数据集中的行

anauzrmj  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(460)

输入数据

val df = Seq(
  ("1", 1, 1, "on"),
  ("1", 2, 2, "off"),
  ("1", 2, 5, "off"),
  ("1", 5, 5, "on"),
  ("1", 5, 6, "off"),
  ("2", 1, 1, "off"),
  ("2", 1, 2, "off"),
  ("2", 2, 2, "on"),
  ("2", 3, 4, "off"),
  ("2", 5, 7, "off"),
  ("2", 8, 10, "on"),
  ("2", 11, 11, "on"),
  ("2", 11, 12, "off"),
  ("3", 1, 12, "off")
).toDF("id", "start", "end", "sw")

我正在尝试使用groupby和mapgroups合并行。
期望输出

1 1 5 on
1 5 6 on
2 1 2 off
2 2 7 on
2 8 10 on
2 11 12 on
3 1 12 off

逻辑如下。每一个off行都合并到上一个on行中。如果第一个或唯一一个值为off,则得到一个off行。从第一行开始,从最后一行结束。数据应该按开始和结束排序。
这是我到目前为止的情况

df
  .as[Row]
  .groupByKey(_.id)
  .mapGroups{case(k, iter) => Row.merge(iter)}

我按id对数据进行分组,然后尝试迭代其他值。

case class Row(id:String, start:Int, var end:Int, sw:String)

object Row {
  def merge(iter: Iterator[Row]): ListBuffer[Row] = {
    val listBuffer = new ListBuffer[Row]
    var bufferRow = Row("", 0, 0, "")
    for(row <- iter){
      if(listBuffer.isEmpty) bufferRow = row
      else if(row.sw == "off") bufferRow.end = row.end
      else if(row.sw == "on") {
        listBuffer += bufferRow
        bufferRow = row
      }
    }
    if(listBuffer.isEmpty) listBuffer += bufferRow
    listBuffer
  }
}

我的输出

[WrappedArray([1,5,6,off])]
[WrappedArray([2,11,12,off])]
[WrappedArray([3,1,12,off])]

我已经使用窗口函数和累积和完成了类似的工作。在这里我试图学习一种新的方法。
使用spark 2.2和scala 2.11。

dddzy1tm

dddzy1tm1#

你提出的解决方案几乎是对的,只是需要一些调整
首先,作为你的方法 Row.merge 返回行列表而不是行,应使用flatmapgroups将列表分解为数据集中的不同记录:

df
  .as[Row]
  .groupByKey(_.id)
  .flatMapGroups { case (k, iter) => Row.merge(iter) }

接下来,让我们深入了解您的 Row.merge 方法。
创建一个空的 bufferRow 在循环的第一次迭代中填充 iter 带声明 if (listBuffer.isEmpty) bufferRow = row . 但是,此语句中的条件对于所有迭代都是真的,这就是为什么输出只包含每个组的最新行。所以这句话应该删除。初始化 bufferRow ,你可以打电话给我 iter.next() :

... = new ListBuffer[Row]
var bufferRow = iter.next()
for(row <- iter) { ...

由于迭代器来自groupby,它至少包含一个元素,因此 iter.next() 不会引发异常。作为对 .next() 方法移除迭代器的第一个元素,之后的循环不会重新处理第一个元素。
接下来,在return语句之前的方法的最后一个语句是 if (listBuffer.isEmpty) listBuffer += bufferRow . 此语句不应有条件。
实际上,在您的循环中,您填充了 bufferRow 然后你把它加到 listBuffer 仅当当前处理的行的“sw”字段设置为“on”时。当前处理的行将成为新的 bufferRow . 意思是最后一个 bufferRow 永远不会保存在 listBuffer ,除非 listBuffer 是空的。所以你的最后几行 merge 方法应为:

...
    bufferRow = row
  }
}
listBuffer += bufferRow

我们现在有了完整的 merge 方法:

def merge(iter: Iterator[Row]): ListBuffer[Row] = {
  val listBuffer = new ListBuffer[Row]
  var bufferRow = iter.next()
  for (row <- iter) {
    if (row.sw == "off") bufferRow.end = row.end
    else if (row.sw == "on" ) {
      listBuffer += bufferRow
      bufferRow = row
    }
  }
  listBuffer += bufferRow
}

运行此代码将得到以下结果,按id和起始列重新排序:

+---+-----+---+---+
|id |start|end|sw |
+---+-----+---+---+
|1  |1    |5  |on |
|1  |5    |6  |on |
|2  |1    |2  |off|
|2  |2    |7  |on |
|2  |8    |10 |on |
|2  |11   |12 |on |
|3  |1    |12 |off|
+---+-----+---+---+

最后一点注意:如果您在分区数据集上运行此代码,您应该小心迭代器排序,我不确定spark是否正确 groupBy 方法保持按迭代器分组的行的顺序。也许使用 .toSeq.sortBy(...) 在我开始之前。

相关问题