Scala Spark Column Parser with Seq and case class

tquggr8v  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(128)

我有一个地址 Dataframe ,其中有三列,如:“addressId”、“customerId”、“address”。Address.csv中的值如下所示:A100,C100,“100,ABC Street,MyCity,MyCountry”。
Address AddressRaw的case类如下所示:

// Define your case class for AddressRaw
  case class AddressRawData(
    addressId: String,
    customerId: String,
    address: String
  )

还有一个case类,它看起来像:

case class AddressData(
    addressId: String,
    customerId: String,
    address: String,
    number: Option[Int],
    road: Option[String],
    city: Option[String],
    country: Option[String]
  )

我想创建一个dataframe 'AddressData',其中包含AddressData中提到的所有相应列。来自Addressraw Dataframe的地址需要传递到解析器中以获得号码,道路,城市和国家。下面是我使用的代码,它给了我错误:
Can not resolve 'number'。它的到来作为数字不是地址数据框的一部分。
下面是我尝试的示例代码:

// ... (other imports and definitions)

object CustomerAddress extends App {
 val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
 import spark.implicits._
 Logger.getRootLogger.setLevel(Level.WARN)

 // Define your case classes
 case class AddressRawData(
   addressId: String,
   customerId: String,
   address: String
 )

 case class AddressData(
   addressId: String,
   customerId: String,
   address: String,
   number: Option[Int],
   road: Option[String],
   city: Option[String],
   country: Option[String]
 )

 case class AccountData(
   customerId: String,
   accountId: String,
   balance: Long
 )

 case class CustomerAccountOutput(
   customerId: String,
   forename: String,
   surname: String,
   accounts: Seq[AccountData]
 )

 // ... (addressParser and other definitions)

 val addressDF: DataFrame = spark.read.option("header", "false").csv("src/main/resources/address_data.csv")
   .toDF("addressId", "customerId", "address")

 val customerAccountDS = spark.read.parquet("src/main/resources/customerAccountOutputDS.parquet").as[CustomerAccountOutput]

    // Define your addressParser function
 def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
   unparsedAddress.map(address => {
     val split = address.address.split(", ")
     address.copy(
       number = Some(split(0).toInt),
       road = Some(split(1)),
       city = Some(split(2)),
       country = Some(split(3))
     )
   })
 }

 // Apply the addressParser function to the address column
 val parsedAddress = addressDF.as[AddressData].groupByKey(_.customerId).mapGroups {
   case (customerId, addresses) => customerId -> addressParser(addresses.toSeq)
 }.toDF("customerId", "address")

 // Join the customerAccountDS and parsedAddress to create the final CustomerDocument
 val finalDF = customerAccountDS.join(parsedAddress, Seq("customerId"), "inner")
   .select(
     $"customerId",
     $"forename",
     $"surname",
     $"accounts",
     $"address"
   ).as[CustomerDocument]

 // Show the records in the final DataFrame
 finalDF.show(false)
}
zbq4xfa0

zbq4xfa01#

错误信息不是很清楚,但是代码方面你不能在对象内部嵌套case类,spark.implicit的生成编码器的函数不能正确地找到它们。不幸的是,这会导致运行时错误,而不是编译时错误。
重新编写它,这样你就可以将case类作为顶层,例如:

// Define your case classes
 case class AddressRawData(
   addressId: String,
   customerId: String,
   address: String
 )

 case class AddressData(
   addressId: String,
   customerId: String,
   address: String,
   number: Option[Int],
   road: Option[String],
   city: Option[String],
   country: Option[String]
 )

 case class AccountData(
   customerId: String,
   accountId: String,
   balance: Long
 )
...

object CustomerAddress extends App {
 val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
 import spark.implicits._
 Logger.getRootLogger.setLevel(Level.WARN)

....

相关问题