scala—如何使用sparkDataframe的模式从sparkDataframe创建配置单元表?

6uxekuva  于 2021-07-09  发布在  Spark
关注(0)|答案(5)|浏览(461)

我想使用sparkDataframe的模式创建一个配置单元表。我该怎么做?
对于固定列,我可以使用:

val CreateTable_query = "Create Table my table(a string, b string, c double)"
sparksession.sql(CreateTable_query)

但是我的数据框中有很多列,那么有没有方法自动生成这样的查询呢?

pdsfdshx

pdsfdshx1#

从spark 2.4开始,您可以使用函数dataframe.schema.toddl来获取列名和类型(甚至对于嵌套结构)

kuhbmx9i

kuhbmx9i2#

另一种方法是使用structtype上可用的方法。。sql、simplestring、treestring等。。。
您可以从Dataframe的模式创建DDL,也可以从DDL创建Dataframe的模式。。
这里有一个例子-(直到spark 2.3)

// Setup Sample Test Table to create Dataframe from
    spark.sql(""" drop database hive_test cascade""")
    spark.sql(""" create database hive_test""")
    spark.sql("use hive_test")
    spark.sql("""CREATE TABLE hive_test.department(
    department_id int ,
    department_name string
    )    
    """)
    spark.sql("""
    INSERT INTO hive_test.department values ("101","Oncology")    
    """)

    spark.sql("SELECT * FROM hive_test.department").show()

/***************************************************************/

现在我有了Dataframe。在实际情况中,您可以使用dataframe读取器从文件/数据库创建dataframe。让我们使用它的模式来创建ddl

// Create DDL from Spark Dataframe Schema using simpleString function

 // Regex to remove unwanted characters    
    val sqlrgx = """(struct<)|(>)|(:)""".r
 // Create DDL sql string and remove unwanted characters

    val sqlString = sqlrgx.replaceAllIn(spark.table("hive_test.department").schema.simpleString, " ")

// Create Table with sqlString
   spark.sql(s"create table hive_test.department2( $sqlString )")

spark2.4以后,您可以在structtype上使用fromdll和toddl方法-

val fddl = """
      department_id int ,
      department_name string,
      business_unit string
      """

    // Easily create StructType from DDL String using fromDDL
    val schema3: StructType = org.apache.spark.sql.types.StructType.fromDDL(fddl)

    // Create DDL String from StructType using toDDL
    val tddl = schema3.toDDL

    spark.sql(s"drop table if exists hive_test.department2 purge")

   // Create Table using string tddl
    spark.sql(s"""create table hive_test.department2 ( $tddl )""")

    // Test by inserting sample rows and selecting
    spark.sql("""
    INSERT INTO hive_test.department2 values ("101","Oncology","MDACC Texas")    
    """)
    spark.table("hive_test.department2").show()
    spark.sql(s"drop table hive_test.department2")
gmxoilav

gmxoilav3#

根据您的问题,您似乎希望使用Dataframe的模式在配置单元中创建表。但正如您所说的,数据框中有许多列,因此有两个选项
首先是通过Dataframe创建直接配置单元表。
第二个是获取这个Dataframe的模式并在配置单元中创建表。
考虑以下代码:

package hive.example

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object checkDFSchema extends App {
  val cc = new SparkConf;
  val sc = new SparkContext(cc)
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
  //First option for creating hive table through dataframe 
  val DF = sparkSession.sql("select * from salary")
  DF.createOrReplaceTempView("tempTable")
  sparkSession.sql("Create table yourtable as select * form tempTable")
  //Second option for creating hive table from schema
  val oldDFF = sparkSession.sql("select * from salary")
  //Generate the schema out of dataframe  
  val schema = oldDFF.schema
  //Generate RDD of you data 
  val rowRDD = sc.parallelize(Seq(Row(100, "a", 123)))
  //Creating new DF from data and schema 
  val newDFwithSchema = sparkSession.createDataFrame(rowRDD, schema)
  newDFwithSchema.createOrReplaceTempView("tempTable")
  sparkSession.sql("create table FinalTable AS select * from tempTable")
}
ymzxtsji

ymzxtsji4#

假设您使用的是spark 2.1.0或更高版本,my\ u df是您的Dataframe,

//get the schema split as string with comma-separated field-datatype pairs
StructType my_schema = my_DF.schema();
String columns = Arrays.stream(my_schema.fields())
                       .map(field -> field.name()+" "+field.dataType().typeName())
                       .collect(Collectors.joining(","));

//drop the table if already created
spark.sql("drop table if exists my_table");
//create the table using the dataframe schema
spark.sql("create table my_table(" + columns + ") 
    row format delimited fields terminated by '|' location '/my/hdfs/location'");
    //write the dataframe data to the hdfs location for the created Hive table
    my_DF.write()
    .format("com.databricks.spark.csv")
    .option("delimiter","|")
    .mode("overwrite")
    .save("/my/hdfs/location");

另一种方法是使用temp表

my_DF.createOrReplaceTempView("my_temp_table");
spark.sql("drop table if exists my_table");
spark.sql("create table my_table as select * from my_temp_table");
mcvgt66p

mcvgt66p5#

这里是pyspark版本从parquet文件创建配置单元表。您可能已经使用推断的模式生成了Parquet文件,现在需要将定义推送到配置单元元存储区。您还可以将定义推送到系统中,比如aws glue或aws athena,而不仅仅推送到hive metastore。这里我使用spark.sql来推送/创建永久表。


# Location where my parquet files are present.

 df = spark.read.parquet("s3://my-location/data/")

    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);

相关问题