val lines = sc.textFile("file://../kv/mydata.log")
val lines = sc.textFile("s3n://../kv/mydata.log")
val lines = sc.textFile("hdfs://../kv/mydata.log")
//Accessing Hive tables from Spark
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class People(name:String,age:Int,city:String,state:String,height:Double,weight:Double)
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession.builder.master("yarn").appName("My Hive
App").config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS people(name String,age Int,city String,state String,height Double,weight Double) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
sql("LOAD DATA LOCAL INPATH 'file:/home/amalprakash32203955/data/people1.txt' INTO TABLE people")
sql("SELECT * FROM people").show()
2条答案
按热度按时间yr9zkbsy1#
可以使用hivecontext执行此操作,如下所示:
dgiusagp2#
RDD现在已经过时了。您可以使用新的sparkapi直接将数据从配置单元表读取到Dataframe。以下是spark 2.3.0版的链接(根据您的安装更改版本)
https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#hive-表
下面是一个示例程序。您可以将最后一行的结果存储到Dataframe中,并执行通常在rdd(如map、filter)上执行的各种操作。