我有一个CSV文件包含addressId和地址的客户如下
| addressId|地址|
| --|--|
| ADD001| 123,Maccolm Street,Copenhagen,丹麦|
| ADD002|“384,East Avenue Street,纽约,美国|
我想解析地址列以获得号码、街道、城市和国家。我得到了初始代码,可以在此基础上进行构建以获得必要的输出
object Address extends App {
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
import spark.implicits._
Logger.getRootLogger.setLevel(Level.WARN)
case class AddressRawData(
addressId: String,
address: String
)
case class AddressData(
addressId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
unparsedAddress.map(address => {
val split = address.address.split(", ")
address.copy(
number = Some(split(0).toInt),
road = Some(split(1)),
city = Some(split(2)),
country = Some(split(3))
)
}
)
}
val addressDF: DataFrame = spark.read.option("header", "true").csv("src/main/resources/address_data.csv")
val addressDS: Dataset[AddressRawData] = addressDF.as[AddressRawData]
}
我假设我需要使用addressParser函数来解析我的addressDS信息。但是,函数的参数是Seq类型。我不确定如何将addressDS作为输入转换为函数来解析原始数据。希望有某种形式的指导来解决这个问题。
1条答案
按热度按时间g6ll5ycj1#
您需要定义addressParser方法,以便将addressId、customerId沿着包含在地址中。然后将addressParser函数注册为一个UDF,并使用它来解析地址、addressId和customerId,这将为您提供给予理想的输出