通过sparksql访问json serde配置单元表

7kqas0il  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(466)

我是新来的。以何种方式,可以通过sparksql读取带有json serde的配置单元表。任何示例代码或文档都可以。

aydmsdu9

aydmsdu91#

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object ReadJson {
  val spark = SparkSession // Building Spark object
    .builder()
    .appName("ReadJson")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","RareJson") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext // Get the spark context

  val sqlContext = spark.sqlContext  // Get the spark Sql Context

  val input = "hdfs://user/..../..../..../file.json" //hdfs path to the file or directory

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)  // application logs

    try {

      val jsonDf = sqlContext
        .read    
        .json(input) // reading the Json file and getting a DataFrame

      jsonDf.show(truncate = false) // showing some data in the console

      jsonDf.createOrReplaceTempView("my_table") // to work with SQL first we create a temporal view

      sqlContext.sql("""SELECT * FROM my_table""").show() //simple query

      // To have the opportunity to view the web console of Spark: http://localhost:4041/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

spark编程指南
http://spark.apache.org/docs/2.3.0/sql-programming-guide.html#overview

相关问题