如何在spark中使用整个配置单元数据库并从外部文件读取sql查询?

anhgbhbe  于 2021-06-28  发布在  Hive
关注(0)|答案(1)|浏览(276)

我在azure中使用hortonworks沙盒和spark 1.6。我有一个用tpc-ds样本数据填充的hive数据库。我想从外部文件读取一些sql查询,并在spark的配置单元数据集上运行它们。我使用spark中的hive数据库来完成这个主题,它只是在我的数据集中使用了一个表,它也在spark中再次编写了sql查询,但是我需要定义一个完整的数据集作为我的源来查询它,我想我应该使用dataframes,但是我不确定,也不知道怎么做!我还想从外部.sql文件导入sql查询,不要再写查询了!你能指导我怎么做吗?非常感谢,贝斯特!

brc7rcf0

brc7rcf01#

spark可以直接从配置单元表中读取数据。您可以使用spark创建、删除配置单元表,甚至可以通过spark执行所有与配置单元hql相关的操作。为此,你需要使用Spark HiveContext 来自spark文档:
spark hivecontext提供了基本sqlcontext提供的功能的超集。其他特性包括使用更完整的hiveql解析器编写查询的能力、对配置单元udf的访问以及从配置单元表读取数据的能力。要使用hivecontext,不需要现有的配置单元设置。
有关更多信息,请访问spark文档
为了避免在代码中编写sql,可以使用属性文件,在其中可以放置所有配置单元查询,然后可以在代码中使用键。
请参见下面spark hivecontext的实现和spark scala中属性文件的使用。

package com.spark.hive.poc

import org.apache.spark._
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.DataFrame;
import org.apache.spark.rdd.RDD;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.hive.HiveContext;

//Import Row.
import org.apache.spark.sql.Row;
//Import Spark SQL data types
import org.apache.spark.sql.types.{ StructType, StructField, StringType };

object ReadPropertyFiles extends Serializable {

  val conf = new SparkConf().setAppName("read local file");

  conf.set("spark.executor.memory", "100M");
  conf.setMaster("local");

  val sc = new SparkContext(conf)
  val sqlContext = new HiveContext(sc)

  def main(args: Array[String]): Unit = {

    var hadoopConf = new org.apache.hadoop.conf.Configuration();
    var fileSystem = FileSystem.get(hadoopConf);
    var Path = new Path(args(0));
    val inputStream = fileSystem.open(Path);
    var Properties = new java.util.Properties;
    Properties.load(inputStream);

    //Create an RDD
    val people = sc.textFile("/user/User1/spark_hive_poc/input/");
    //The schema is encoded in a string
    val schemaString = "name address";

    //Generate the schema based on the string of schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));

    //Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim));
    //Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
    peopleDataFrame.printSchema();

    peopleDataFrame.registerTempTable("tbl_temp")

    val data = sqlContext.sql(Properties.getProperty("temp_table"));

    //Drop Hive table
    sqlContext.sql(Properties.getProperty("drop_hive_table"));
    //Create Hive table
    sqlContext.sql(Properties.getProperty("create_hive_tavle"));
    //Insert data into Hive table
    sqlContext.sql(Properties.getProperty("insert_into_hive_table"));
    //Select Data into Hive table
    sqlContext.sql(Properties.getProperty("select_from_hive")).show();

    sc.stop

  }
}

属性文件中的条目:

temp_table=select * from tbl_temp
drop_hive_table=DROP TABLE IF EXISTS default.test_hive_tbl
create_hive_tavle=CREATE TABLE IF NOT EXISTS default.test_hive_tbl(name string, city string) STORED AS ORC
insert_into_hive_table=insert overwrite table default.test_hive_tbl select * from tbl_temp
select_from_hive=select * from default.test_hive_tbl

运行此作业的spark submit命令:

[User1@hadoopdev ~]$ spark-submit --num-executors 1 \
--executor-memory 100M --total-executor-cores 2 --master local \
--class com.spark.hive.poc.ReadPropertyFiles Hive-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
/user/User1/spark_hive_poc/properties/sql.properties

注意:属性文件位置应该是hdfs位置。

相关问题