将一种类型的spark scala数据集转换为另一种类型

xvw2m8pv  于 2022-12-13  发布在  Scala
关注(0)|答案(1)|浏览(212)

我有一个数据集,具有以下案例类类型:

case class AddressRawData(
                         addressId: String,
                         customerId: String,
                         address: String
                       )

我要将其转换为:

case class AddressData(
                          addressId: String,
                          customerId: String,
                          address: String,
                          number: Option[Int], //i.e. it is optional
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )

使用解析器函数:

def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")
      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }

我是scala和spark的新手,有没有人能让我知道这是怎么做到的?

uz75evzq

uz75evzq1#

你走对了路!当然,有多种方法可以做到这一点。但是,由于你已经在创建一些case类的过程中,并且已经开始创建一个解析函数,一个优雅的解决方案是使用Dataset的map函数。从文档中,这个map函数签名如下:

def map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U]

其中T是起始类型(在本例中为AddressRawData),U是要得到的类型(在本例中为AddressData)。因此,map函数的输入是一个将AddressRawData转换为AddressData的函数。这可能就是您开始创建的addressParser
现在,您当前的addressParser具有以下签名:

def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData]

为了能够将其提供给map函数,我们需要创建以下签名:

def newAddressParser(unparsedAddress: AddressRawData): AddressData

了解了所有这些,我们就可以进一步工作了!下面是一个例子:

import spark.implicits._
import scala.util.Try

// Your case classes
case class AddressRawData(addressId: String, customerId: String, address: String)
case class AddressData(
  addressId: String,
  customerId: String,
  address: String,
  number: Option[Int],
  road: Option[String],
  city: Option[String],
  country: Option[String]
)

// Your addressParser function, adapted to be able to feed into the Dataset.map
// function
def addressParser(rawAddress: AddressRawData): AddressData = {
  val addressArray = rawAddress.address.split(", ")
  AddressData(
    rawAddress.addressId,
    rawAddress.customerId,
    rawAddress.address,
    Try(addressArray(0).toInt).toOption,
    Try(addressArray(1)).toOption,
    Try(addressArray(2)).toOption,
    Try(addressArray(3)).toOption
  )
}

// Creating a sample dataset
val rawDS = Seq(
  AddressRawData("1", "1", "20, my super road, beautifulCity, someCountry"),
  AddressRawData("1", "1", "badFormat, some road, cityButNoCountry")
).toDS

val parsedDS = rawDS.map(addressParser)

parsedDS.show                                                                                                                                                                                                                                                            
+---------+----------+--------------------+------+-------------+----------------+-----------+                                                                                                                                                                                   
|addressId|customerId|             address|number|         road|            city|    country|                                                                                                                                                                                   
+---------+----------+--------------------+------+-------------+----------------+-----------+                                                                                                                                                                                   
|        1|         1|20, my super road...|    20|my super road|   beautifulCity|someCountry|                                                                                                                                                                                   
|        1|         1|badFormat, some r...|  null|    some road|cityButNoCountry|       null|                                                                                                                                                                                   
+---------+----------+--------------------+------+-------------+----------------+-----------+

正如您所看到的,由于您已经预见到解析可能出错,因此可以很容易地使用scala.util.Try来尝试获取原始地址的片段,并在其中添加一些健壮性(第二行包含一些null值,它无法解析address字符串)。
希望这对你有帮助!

相关问题