我有一个地址 Dataframe ,其中有三列,如:“addressId”、“customerId”、“address”。Address.csv中的值如下所示:A100,C100,“100,ABC Street,MyCity,MyCountry”。
Address AddressRaw的case类如下所示:
// Define your case class for AddressRaw
case class AddressRawData(
addressId: String,
customerId: String,
address: String
)
还有一个case类,它看起来像:
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
我想创建一个dataframe 'AddressData',其中包含AddressData中提到的所有相应列。来自Addressraw Dataframe的地址需要传递到解析器中以获得号码,道路,城市和国家。下面是我使用的代码,它给了我错误:Can not resolve 'number'
。它的到来作为数字不是地址数据框的一部分。
下面是我尝试的示例代码:
// ... (other imports and definitions)
object CustomerAddress extends App {
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
import spark.implicits._
Logger.getRootLogger.setLevel(Level.WARN)
// Define your case classes
case class AddressRawData(
addressId: String,
customerId: String,
address: String
)
case class AddressData(
addressId: String,
customerId: String,
address: String,
number: Option[Int],
road: Option[String],
city: Option[String],
country: Option[String]
)
case class AccountData(
customerId: String,
accountId: String,
balance: Long
)
case class CustomerAccountOutput(
customerId: String,
forename: String,
surname: String,
accounts: Seq[AccountData]
)
// ... (addressParser and other definitions)
val addressDF: DataFrame = spark.read.option("header", "false").csv("src/main/resources/address_data.csv")
.toDF("addressId", "customerId", "address")
val customerAccountDS = spark.read.parquet("src/main/resources/customerAccountOutputDS.parquet").as[CustomerAccountOutput]
// Define your addressParser function
def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
unparsedAddress.map(address => {
val split = address.address.split(", ")
address.copy(
number = Some(split(0).toInt),
road = Some(split(1)),
city = Some(split(2)),
country = Some(split(3))
)
})
}
// Apply the addressParser function to the address column
val parsedAddress = addressDF.as[AddressData].groupByKey(_.customerId).mapGroups {
case (customerId, addresses) => customerId -> addressParser(addresses.toSeq)
}.toDF("customerId", "address")
// Join the customerAccountDS and parsedAddress to create the final CustomerDocument
val finalDF = customerAccountDS.join(parsedAddress, Seq("customerId"), "inner")
.select(
$"customerId",
$"forename",
$"surname",
$"accounts",
$"address"
).as[CustomerDocument]
// Show the records in the final DataFrame
finalDF.show(false)
}
1条答案
按热度按时间zbq4xfa01#
错误信息不是很清楚,但是代码方面你不能在对象内部嵌套case类,spark.implicit的生成编码器的函数不能正确地找到它们。不幸的是,这会导致运行时错误,而不是编译时错误。
重新编写它,这样你就可以将case类作为顶层,例如: