scala—如何在spark列中编写函数,以便列中的每个字段都增加值?

ma8fv8wu  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(353)

它不是关于唯一id的,所以我不打算使用increase unique number api,而是尝试通过定制查询来解决它
考虑给定值30,现在是当前Dataframe df 需要添加一个名为 hop_number 因此,列中从上到下的每个字段将从30开始递增2,因此

with 2 parameters
x -> start number, here is 30
y -> like step or offset, here is 2

   hop_number
---------------
      30
      32
      34
      36
      38
      40
    ......

我知道在rdd我们可以用 map 但是如何在Dataframe中以最小的成本完成同样的工作呢?

df.column("hop_number", 30 + map(x => x + 2)) // pseudo code
ahy6op9u

ahy6op9u1#

检查以下代码。

scala> import org.apache.spark.sql.expressions._
scala> import org.apache.spark.sql.functions._

scala> val x = lit(30)
x: org.apache.spark.sql.Column = 30

scala> val y = lit(2)
y: org.apache.spark.sql.Column = 2

scala> df.withColumn("hop_number",(x + (row_number().over(Window.orderBy(lit(1)))-1) * y)).show(false)

+----------+
|hop_number|
+----------+
|30        |
|32        |
|34        |
|36        |
|38        |
+----------+
hpcdzsge

hpcdzsge2#

假设您有一个分组和排序列,您可以使用window函数。

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import Window
tst= sqlContext.createDataFrame([(1,1,14),(1,2,4),(1,3,10),(2,1,90),(7,2,30),(2,3,11)],schema=['group','order','value'])
w=Window.partitionBy('group').orderBy('order')
tst_hop= tst.withColumn("temp",F.sum(F.lit(2)).over(w)).withColumn("hop_number",F.col('temp')+28)

结果是:

tst_hop.show()
+-----+-----+-----+----+----------+
|group|order|value|temp|hop_number|
+-----+-----+-----+----+----------+
|    1|    1|   14|   2|        30|
|    1|    2|    4|   4|        32|
|    1|    3|   10|   6|        34|
|    2|    1|   90|   2|        30|
|    2|    3|   11|   4|        32|
|    7|    2|   30|   2|        30|
+-----+-----+-----+----+----------+

如果您需要不同的方法,请提供Dataframe的示例数据。

相关问题