传递以任何case类返回类型作为参数的函数

wgxvkvu9  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(438)

这也许是个愚蠢的问题,但我已经挣扎了很长时间了。它确实类似于这个问题,但我无法在我的代码中应用它(由于模式或作为一个函数)。
我想将一个flatmap(或map)转换函数传递给一个函数参数,然后将它代理给一个实际调用df.rdd.flatmap方法的策略函数。我会尽力解释的!

case class Order(id: String, totalValue: Double, freight: Double) 
case class Product(id: String, price: Double) 

... or any other case class, whatever one needs to transform a row into ...

实体类:

class Entity(path: String) = {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Entity = {
      this.getStrategy.flatMap[T](mapFunction)
      return this
  }
  def save(path: String): Unit = {
      ... write logic ...
  } 
}

实体的方法可能有不同的策略。实体策略如下:

abstract class EntityStrategy(private val entity: Entity,
                              private val spark: SparkSession) {
  ...
  def flatMap[T](mapFunction: (Row) => ArrayBuffer[T])
  def map[T](mapFunction: (Row) => T)
}

以及一个实体战略实施示例:

class SparkEntityStrategy(private val entity: Entity, private val spark: SparkSession)
  extends EntityStrategy(entity, spark) {
  ...
  override def map[T](mapFunction: Row => T): Unit = {
    val rdd = this.getData.rdd.map(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }

  override def flatMap[T](mapFunction: (Row) => ArrayBuffer[T]): Unit = {
    var rdd = this.getData.rdd.flatMap(f = mapFunction)
    this.dataFrame = this.spark.createDataFrame(rdd)
  }
}

最后,我想创建一个flatmap/map函数并这样调用它:

def transformFlatMap(row: Row): ArrayBuffer[Order] = {
    var orders = new ArrayBuffer[Order]
    var _deliveries = row.getAs[Seq[Row]]("deliveries")
    _deliveries.foreach(_delivery => {
       var order = Order(
           id = row.getAs[String]("id"),
           totalValue = _delivery.getAs("totalAmount").asInstanceOf[Double])
      orders += order
    })
   return orders
}

val entity = new Entity("path")
entity.flatMap[Order](transformFlatMap).save("path")

当然,这是行不通的。我在sparkentitystrategy上出错了:
错误:(95,35)没有可用于t val rdd=this.getdata.rdd.map(f=mapfunction)的类标记
我试过添加一个 (implicit encoder: Encoder: T) 实体方法和战略方法,但这是不可能的。可能是做错了什么,因为我刚到斯卡拉。
如果我去掉“t”并通过一个实际的case类,一切都会正常。

cgvd09ve

cgvd09ve1#

为了让编译器和spark的方法都得到满足,我需要添加以下类型标记:
[ T <: scala.Product : ClassTag : TypeTag ]
所以这两种方法都变成了:

def map[T <: Product : ClassTag : TypeTag](mapFunction: (Row) => T): Entity
def flatMap[T <: scala.Product : ClassTag : TypeTag](mapFunction: (Row) => TraversableOnce[T]): Entity

关于scala.product:
所有产品的基本特征,在标准库中至少包括scala.product1到scala.product22,因此也包括它们的子类scala.tuple1到scala.tuple22。此外,所有case类都使用综合生成的方法实现产品。
因为我使用case类对象作为函数的返回类型,所以我需要scala.product以便spark的createdataframe能够匹配正确的重载。
为什么同时使用classtag和typetag?
通过删除typetag,编译器抛出以下错误:
错误:(96,48)t this.dataframe=this.spark.createdataframe(rdd)没有可用的typetag
以及移除类标记:
错误:(95,35)没有可用于t val rdd=this.getdata.rdd.map(f=mapfunction)的类标记
添加它们使这两种方法都很满意,一切都按预期进行。
找到一篇解释scala中类型擦除的好文章。

相关问题