我正在使用spark 2.4.7。我希望它在阅读Parquet文件时删掉一些列。但我没能强调这种行为。
这是我的测试代码:
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.io.StdIn
object Main {
def main(args: Array[String]): Unit = {
val path = "/tmp/spark-test"
val spark = SparkSession.builder().master("local[2]").getOrCreate()
import spark.implicits._
// Generate and save some data
(0 to 1000).map(i =>
Full(Some(i), Some(i.formatted("%d")), Some((0 to i).map(j => j)))
).toDS().write.mode(SaveMode.Overwrite).format("parquet").save(path)
// Read the full dataset (as comparison baseline)
println("Reading full data")
val fullDs = spark.read.schema(spark.emptyDataset[Full].schema).parquet(path)
println(fullDs.count())
fullDs.explain(true)
// Try to read only one column
println("Reading partial data")
val partialDs = spark.read.schema(spark.emptyDataset[PartialInt].schema).parquet(path)
println(partialDs.count())
partialDs.explain(true)
// Leave the session alive to allow to browse to Spark UI
StdIn.readLine()
}
/**Possible fields */
trait Mixin {
val intF: Option[Int]
val strF: Option[String]
val arrayF: Option[Seq[Int]]
}
/**Canonical representation, implementing all fields. */
case class Full(intF: Option[Int], strF: Option[String], arrayF: Option[Seq[Int]]) extends Mixin
/**Partial representation, avoiding loading unnecessary columns */
case class PartialInt(intF: Option[Int]) extends Mixin {
override val strF: Option[String] = ???
override val arrayF: Option[Seq[Int]] = ???
}
}
在spark ui中,两个读取阶段的输入大小似乎相同,在输出的计划中我看不到任何清晰的提示。有没有办法清晰地突出列修剪机制?
善良,亚历克西斯。
暂无答案!
目前还没有任何答案,快来回答吧!