scala spark函数具有泛型dataset[t]参数,并且还返回dataset[t]?

bbmckpt7  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(302)

我知道spark要想把一个Dataframe转换成某个类的数据集,需要一个编码器。不过,我通常可以用编码器和调用的main方法进行处理 .as[MyClass] 像这样:

val df = spark.read.parquet("something")
val myDS = df.as[MyClass]

只要定义了一个编码器就可以工作 MyClass 我想创建一个这样的函数

def hello[T](inputDataSet: Dataset[T])(implicit spark: SparkSession): Dataset[T] = {

    val replacedDataFrame = inputDataSet
      // do some transformation as Dataframe
      .as[T]

    replacedDataFrame

}

在那里我也返回了一个 Dataset[T] . 但是,当我尝试投射Dataframe时 .as[T] 它抱怨说“没有找到它”。我只是在想既然它能理解我在做什么 Dataset[T] 它应该能理解相反的意思,但我想不能。有办法吗?
示例用例:

// function to replace a column with values from another DataSet
def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K])(implicit spark: SparkSession): Dataset[T] = {

    val replacedDataFrame = inputDS
      .join(broadcast(joinable), "col1") // exists in "joinableDS" and "inputDS"
      .withColumnRenamed("col1", "to-drop")
      .withColumnRenamed("col2", "col1") // "col2" exists only in "joinableDS"
      .drop("to-drop") 
      .as[T]

    replacedDataFrame

}

注意这不是我唯一的用例。但这里的问题是-我把一个 Dataset[T] 在对它做了一些操作之后,我想将返回指定为 Dataset[T] 也。一旦我做了那件事 join 它将 DatasetDataframe 它失去了类的定义 T .

92dk7w1h

92dk7w1h1#

尝试这个,我很难解释,但它解决了你得到的错误信息:

import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.Encoders

case class T(name: String, age: Long)
case class K(name: String, age2: Long)

val dt = Seq(T("Andy", 32), T("John", 33), T("Bob", 33)).toDS()
dt.show()

val dk = Seq(K("Andy", 32), K("John", 133), K("Bob", 245)).toDS()
dk.show()

implicit val sqlContext: SparkSession = spark

def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K])(implicit spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
//def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K]) : DataFrame = {
    val replacedDataFrame = inputDS
      .join(broadcast(joinableDS), "name")  
      .withColumnRenamed("age", "to-drop")
      .withColumnRenamed("age2", "age")  
      .drop("to-drop") 
      .as[T]

    replacedDataFrame
}

val ds = swapColumnValue(dt,dk) 
ds.show(false)

退货:

+----+---+
|name|age|
+----+---+
|Andy| 32|
|John| 33|
| Bob| 33|
+----+---+

+----+----+
|name|age2|
+----+----+
|Andy|  32|
|John| 133|
| Bob| 245|
+----+----+

+----+---+
|name|age|
+----+---+
|Andy|32 |
|John|133|
|Bob |245|
+----+---+

ds是t类型的数据集。

相关问题