import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object ReadCsv {
val spark = SparkSession
.builder()
.appName("ReadCsv")
.master("local[*]")
.config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id","ReadCsv") // To silence Metrics warning
.getOrCreate()
val sqlContext = spark.sqlContext
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
try {
val df = sqlContext
.read
.csv("/path/directory_to_csv_files/") // Here we read the .csv files
.cache()
df.repartition(4) // we get four files
.write
.parquet("/path/directory_to_parquet_files/") // output format file.parquet.snappy by default
// if we want parquet uncompressed before write we have to do:
// sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
// To have the opportunity to view the web console of Spark: http://localhost:4040/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
spark.stop()
println("SparkSession stopped")
}
}
}
1条答案
按热度按时间dl5txlt91#
我以你为例
Scala
代码,但在Python
几乎是一样的。我也发表了一些评论和解释