spark scala中的地址解析器

tf7tbtn2  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(123)

我有一个CSV文件包含addressId和地址的客户如下
| addressId|地址|
| --|--|
| ADD001| 123,Maccolm Street,Copenhagen,丹麦|
| ADD002|“384,East Avenue Street,纽约,美国|
我想解析地址列以获得号码、街道、城市和国家。我得到了初始代码,可以在此基础上进行构建以获得必要的输出

object Address extends App {

  val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()

  import spark.implicits._

  Logger.getRootLogger.setLevel(Level.WARN)

  case class AddressRawData(
                             addressId: String,
                             address: String
                           )

  case class AddressData(
                          addressId: String,
                          address: String,
                          number: Option[Int],
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )

 def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")

      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }
  val addressDF: DataFrame = spark.read.option("header", "true").csv("src/main/resources/address_data.csv")

  val addressDS: Dataset[AddressRawData] = addressDF.as[AddressRawData]
}

我假设我需要使用addressParser函数来解析我的addressDS信息。但是,函数的参数是Seq类型。我不确定如何将addressDS作为输入转换为函数来解析原始数据。希望有某种形式的指导来解决这个问题。

g6ll5ycj

g6ll5ycj1#

def addressParser(addressId: String, customerId: String, address: String): AddressData = {
    val split = address.split(", ")
    AddressData(addressId, customerId, address, Some(split(0).toInt), Some(split(1)), Some(split(2)), Some(split(3)))
  }

  spark.udf.register("addressParserUDF", addressParser _)

  val groupedAddressDataDS = addressDF
    .selectExpr("customerId", "addressParserUDF(addressId, customerId, address) as parsedAddress")
    .groupBy("customerId")
    .agg(Map("parsedAddress" -> "collect_set"))
    .withColumnRenamed("collect_set(parsedAddress)", "address")
    .select($"customerId", $"address")

  // Join customerAccountDS and groupedAddressDataDS by customerId
  val customerDocumentDS = customerAccountDS
    .join(groupedAddressDataDS, Seq("customerId"), "left_outer")
    .selectExpr("customerId", "forename", "surname", "accounts", "address")
    .as[CustomerDocument]

您需要定义addressParser方法,以便将addressId、customerId沿着包含在地址中。然后将addressParser函数注册为一个UDF,并使用它来解析地址、addressId和customerId,这将为您提供给予理想的输出

相关问题