我知道spark要想把一个Dataframe转换成某个类的数据集,需要一个编码器。不过,我通常可以用编码器和调用的main方法进行处理 .as[MyClass]
像这样:
val df = spark.read.parquet("something")
val myDS = df.as[MyClass]
只要定义了一个编码器就可以工作 MyClass
我想创建一个这样的函数
def hello[T](inputDataSet: Dataset[T])(implicit spark: SparkSession): Dataset[T] = {
val replacedDataFrame = inputDataSet
// do some transformation as Dataframe
.as[T]
replacedDataFrame
}
在那里我也返回了一个 Dataset[T]
. 但是,当我尝试投射Dataframe时 .as[T]
它抱怨说“没有找到它”。我只是在想既然它能理解我在做什么 Dataset[T]
它应该能理解相反的意思,但我想不能。有办法吗?
示例用例:
// function to replace a column with values from another DataSet
def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K])(implicit spark: SparkSession): Dataset[T] = {
val replacedDataFrame = inputDS
.join(broadcast(joinable), "col1") // exists in "joinableDS" and "inputDS"
.withColumnRenamed("col1", "to-drop")
.withColumnRenamed("col2", "col1") // "col2" exists only in "joinableDS"
.drop("to-drop")
.as[T]
replacedDataFrame
}
注意这不是我唯一的用例。但这里的问题是-我把一个 Dataset[T]
在对它做了一些操作之后,我想将返回指定为 Dataset[T]
也。一旦我做了那件事 join
它将 Dataset
到 Dataframe
它失去了类的定义 T
.
1条答案
按热度按时间92dk7w1h1#
尝试这个,我很难解释,但它解决了你得到的错误信息:
退货:
ds是t类型的数据集。