scala—如何编写泛型函数来计算sparkDataframe的withcolumn中的列值

iih3973s  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(519)

嗨,我有下面的dataframe,它有countries列以及多个其他列和多个行。我想写一个泛型函数(因为在多个地方使用),可以在withcolumn中使用它来创建一个新列。
输入

| countries  |
|------------|
| RFRA       | 
| BRES       |
| EAST       |
| RUSS       |
| ....       |

输出

| countries |
|-----------|
| FRA       | 
| BRA       |
| POL       |
| RUS       |
| ...       |

下面是我的代码,当我把countries列传递给函数时,无法用字符串计算列。如何从列中提取值,并使用指定的字符串值进行计算,然后作为列返回。

val df = sample.withColumn("renamedcountries", replace($"countries"))

def replace(countries: Column) :Column = {
  val Updated = countries match {
    case "RFRA" => "FRA"
    case "BRES" => "BRA"
    case "RESP" => "ESP"
    case "RBEL" => "BEL"
    case "RGRB" => "GBR"
    case "RALL" => "DEU"
    case "MARO" => "MAR"
    case "RPOR" => "PRT"
    case _ => "unknown"
  }
  Updated
}
xwmevbvl

xwmevbvl1#

将函数逻辑 Package 为 udf 叫这个 udf 来自不同的地方。

import org.apache.spark.sql.functions._

   val df = Seq( ("RFRA"), ("BRES"), ("RUSS")).toDF("countries")

   val mapCountries = udf[String, String](country => {
      val Updated = country match {
        case "RFRA" => "FRA"
        case "BRES" => "BRA"
        case "RESP" => "ESP"
        case "RBEL" => "BEL"
        case "RGRB" => "GBR"
        case "RALL" => "DEU"
        case "MARO" => "MAR"
        case "RPOR" => "PRT"
        case _ => "unknown"
      }
    Updated
   })

   df.withColumn("renamedCountries", mapCountries($"countries")).show()

+---------+----------------+
|countries|renamedCountries|
+---------+----------------+
|     RFRA|             FRA|
|     BRES|             BRA|
|     RUSS|         unknown|
+---------+----------------+
ebdffaop

ebdffaop2#

给你 typedLit ,因此每当发生更改时,只更新 map 在方法上,

val df  = Seq("RFRA","BRES","EAST", "RUSS").toDF("countries")

    val replaceMap = typedLit(Map("RFRA" -> "FRA",
      "BRES" -> "BRA",
      "RESP" -> "ESP",
      "RBEL" -> "BEL",
      "RGRB" -> "GBR",
      "RALL" -> "DEU",
      "MARO" -> "MAR",
      "RPOR" -> "PRT"))

    def replace(countries: Column): Column = {
      when(replaceMap($"$countries").isNotNull,replaceMap($"$countries"))
        .otherwise(lit("unknown"))
    }

    val res = df.withColumn("modified_countries", replace($"countries"))
    res.show(false)

+---------+------------------+
|countries|modified_countries|
+---------+------------------+
|RFRA     |FRA               |
|BRES     |BRA               |
|EAST     |unknown           |
|RUSS     |unknown           |
+---------+------------------+
wf82jlnq

wf82jlnq3#

应将其定义为可重用表达式:

def replace(c: Column): Column = {
  when(c === "RFRA", "FRA")
    .when(c === "BRES", "BRA")
    .when(c === "RESP", "ESP")
    .when(c === "RBEL", "BEL")
    // add more here
    .otherwise("unknown")
}

df
  .withColumn("contries",replace($"countries"))
  .show()

也可以将修改打包到Map中,并在以下表达式中使用:

val replaceMap = Map("RFRA" -> "FRA",
      "BRES" -> "BRA",
      "RESP" -> "ESP",
      "RBEL" -> "BEL",
      "RGRB" -> "GBR",
      "RALL" -> "DEU",
      "MARO" -> "MAR",
      "RPOR" -> "PRT")

def replace(countries: Column): Column = {
      replaceMap.foldLeft(when(lit(false),countries)){case (acc,(k,v)) => acc.when(countries === k,v)}
        .otherwise("unknown")
  }

相关问题