如何将csv转换成hdfs内部的Parquet文件

ar5n3qh5  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(325)

我是个新手 Big Data ,所以 Hadoop 以及 hdfs 现在对我来说有点消失了,所以我请求帮助。现在我有4个文件在里面 csv 位于中的格式 HDFS 群集,我应该在 PARQUET 格式使用 Python ,我不知道该怎么做。我希望你能帮助我解决这个棘手的问题。

dl5txlt9

dl5txlt91#

我以你为例 Scala 代码,但在 Python 几乎是一样的。
我也发表了一些评论和解释

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object ReadCsv {
  val spark = SparkSession
    .builder()
    .appName("ReadCsv")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ReadCsv") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      val df = sqlContext
        .read
        .csv("/path/directory_to_csv_files/") // Here we read the .csv files
        .cache()

      df.repartition(4) // we get four files
          .write
          .parquet("/path/directory_to_parquet_files/") // output format file.parquet.snappy by default
      // if we want parquet uncompressed before write we have to do:
      // sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

相关问题