输入数据
val df = Seq(
("1", 1, 1, "on"),
("1", 2, 2, "off"),
("1", 2, 5, "off"),
("1", 5, 5, "on"),
("1", 5, 6, "off"),
("2", 1, 1, "off"),
("2", 1, 2, "off"),
("2", 2, 2, "on"),
("2", 3, 4, "off"),
("2", 5, 7, "off"),
("2", 8, 10, "on"),
("2", 11, 11, "on"),
("2", 11, 12, "off"),
("3", 1, 12, "off")
).toDF("id", "start", "end", "sw")
我正在尝试使用groupby和mapgroups合并行。
期望输出
1 1 5 on
1 5 6 on
2 1 2 off
2 2 7 on
2 8 10 on
2 11 12 on
3 1 12 off
逻辑如下。每一个off行都合并到上一个on行中。如果第一个或唯一一个值为off,则得到一个off行。从第一行开始,从最后一行结束。数据应该按开始和结束排序。
这是我到目前为止的情况
df
.as[Row]
.groupByKey(_.id)
.mapGroups{case(k, iter) => Row.merge(iter)}
我按id对数据进行分组,然后尝试迭代其他值。
case class Row(id:String, start:Int, var end:Int, sw:String)
object Row {
def merge(iter: Iterator[Row]): ListBuffer[Row] = {
val listBuffer = new ListBuffer[Row]
var bufferRow = Row("", 0, 0, "")
for(row <- iter){
if(listBuffer.isEmpty) bufferRow = row
else if(row.sw == "off") bufferRow.end = row.end
else if(row.sw == "on") {
listBuffer += bufferRow
bufferRow = row
}
}
if(listBuffer.isEmpty) listBuffer += bufferRow
listBuffer
}
}
我的输出
[WrappedArray([1,5,6,off])]
[WrappedArray([2,11,12,off])]
[WrappedArray([3,1,12,off])]
我已经使用窗口函数和累积和完成了类似的工作。在这里我试图学习一种新的方法。
使用spark 2.2和scala 2.11。
1条答案
按热度按时间dddzy1tm1#
你提出的解决方案几乎是对的,只是需要一些调整
首先,作为你的方法
Row.merge
返回行列表而不是行,应使用flatmapgroups将列表分解为数据集中的不同记录:接下来,让我们深入了解您的
Row.merge
方法。创建一个空的
bufferRow
在循环的第一次迭代中填充iter
带声明if (listBuffer.isEmpty) bufferRow = row
. 但是,此语句中的条件对于所有迭代都是真的,这就是为什么输出只包含每个组的最新行。所以这句话应该删除。初始化bufferRow
,你可以打电话给我iter.next()
:由于迭代器来自groupby,它至少包含一个元素,因此
iter.next()
不会引发异常。作为对.next()
方法移除迭代器的第一个元素,之后的循环不会重新处理第一个元素。接下来,在return语句之前的方法的最后一个语句是
if (listBuffer.isEmpty) listBuffer += bufferRow
. 此语句不应有条件。实际上,在您的循环中,您填充了
bufferRow
然后你把它加到listBuffer
仅当当前处理的行的“sw”字段设置为“on”时。当前处理的行将成为新的bufferRow
. 意思是最后一个bufferRow
永远不会保存在listBuffer
,除非listBuffer
是空的。所以你的最后几行merge
方法应为:我们现在有了完整的
merge
方法:运行此代码将得到以下结果,按id和起始列重新排序:
最后一点注意:如果您在分区数据集上运行此代码,您应该小心迭代器排序,我不确定spark是否正确
groupBy
方法保持按迭代器分组的行的顺序。也许使用.toSeq.sortBy(...)
在我开始之前。