scala—使用spark mappartition函数迭代dataframe行并添加新列

wj8zmpe1  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(395)

我对spark和scala还不熟悉。我试着用 mapPartitions Spark作用 dataframe 迭代 dataframe 行,并基于上一行中另一列的值派生新列。
输入Dataframe:

+------------+----------+-----------+
|   order_id |person_id | create_dt |
+------------+----------+-----------+
|          1 |        1 | 2020-01-11|
|          2 |        1 | 2020-01-12|
|          3 |        1 | 2020-01-13|
|          4 |        1 | 2020-01-14|
|          5 |        1 | 2020-01-15|
|          6 |        1 | 2020-01-16|
+------------+----------+-----------+

从上面的Dataframe,我想使用 mapPartitions 函数并调用一个scala方法 Iterator[Row] 并生成另一个具有新列的输出行 date_diff . 新列派生为 create_dt 当前行和上一行的列
预期输出Dataframe:

+------------+----------+-----------+-----------+
|   order_id |person_id | create_dt | date_diff |
+------------+----------+-----------+-----------+
|          1 |        1 | 2020-01-11|        NA |
|          2 |        1 | 2020-01-12|         1 |
|          3 |        1 | 2020-01-13|         1 |
|          4 |        1 | 2020-01-14|         1 |
|          5 |        1 | 2020-01-15|         1 |
|          6 |        1 | 2020-01-16|         1 |
+------------+----------+-----------+-----------+

到目前为止我试过的代码:

// Read input data
val input_data = sc.parallelize(Seq((1,1,"2020-01-11"), (2,1,"2020-01-12"), (3,1,"2020-01-13"), (4,1,"2020-01-14"), (5,1,"2020-01-15"), (6,1,"2020-01-16"))).toDF("order_id", "person_id","create_dt")

//Generate output data using mapPartitions and call getDateDiff method
val output_data = input_data.mapPartitions(getDateDiff).show()

//getDateDiff method to iterate over each row and derive the date difference
def getDateDiff(srcItr: scala.collection.Iterator[Row]) : Iterator[Row] = { 
  for(row <- srcItr){ row.get(2)} 
/*derive date difference and generate output row*/ 
}

有人能帮我写这本书吗 getDateDiff 方法来获取预期的输出。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题