使用spark和scala将数据插入配置单元表中的问题

3pmvbmvn  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(374)

我是新来的。我有件事想做。
我已经创建了两个数据流;第一个从文本文件中读取数据,并使用hivecontext将其注册为一个诱人的文件。另一个不断地从kafka获取rdd,对于每个rdd,它创建数据流并将内容注册为诱人的。最后,我在一个键上连接这两个临时表以获得最终结果集。我想在配置单元表中插入该结果集。但我没有主意了。尝试遵循一些示例,但是只在配置单元中创建一个包含一列的表,而且太不可读。您能告诉我如何在特定的数据库和配置单元表中插入结果吗。请注意,我可以看到使用show函数连接的结果,所以真正的挑战在于在配置单元表中插入。
下面是我正在使用的代码。

imports.....

    object MSCCDRFilter {        
     def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("Flume, Kafka and Spark MSC CDRs Manipulation")

        val sc = new SparkContext(sparkConf)
        val sqlContext = new HiveContext(sc)
        import sqlContext.implicits._
        val cgiDF = sc.textFile("file:///tmp/omer-learning/spark/dim_cells.txt").map(_.split(",")).map(p => CGIList(p(0).trim, p(1).trim, p(2).trim,p(3).trim)).toDF()
        cgiDF.registerTempTable("my_cgi_list")
        val CGITable=sqlContext.sql("select *"+
          " from my_cgi_list")
        CGITable.show()    // this CGITable is a structure I defined in the project
                val streamingContext = new StreamingContext(sc, Seconds(10)
        val zkQuorum="hadoopserver:2181"
        val topics=Map[String, Int]("FlumeToKafka"->1)

        val messages: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext,zkQuorum,"myGroup",topics)

        val logLinesDStream = messages.map(_._2)  //获取数据
        logLinesDStream.print()
        val MSCCDRDStream = logLinesDStream.map(MSC_KPI.parseLogLine) // change MSC_KPI to MCSCDR_GO if you wanna change the class
// MSCCDR_GO and MSC_KPI are structures defined in the project    
        MSCCDRDStream.foreachRDD(MSCCDR => {
          println("+++++++++++++++++++++NEW RDD ="+ MSCCDR.count())

          if (MSCCDR.count() == 0) {
            println("==================No logs received in this time interval=================")
          } else {

            val dataf=sqlContext.createDataFrame(MSCCDR)
            dataf.registerTempTable("hive_msc")
            cgiDF.registerTempTable("my_cgi_list")
            val sqlquery=sqlContext.sql("select a.cdr_type,a.CGI,a.cdr_time, a.mins_int, b.Lat, b.Long,b.SiteID from hive_msc a left join my_cgi_list b"
            +" on a.CGI=b.CGI")
            sqlquery.show()
            sqlContext.sql("SET hive.exec.dynamic.partition = true;")
            sqlContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict;")
            sqlquery.write.mode("append").partitionBy("CGI").saveAsTable("omeralvi.msc_data")      

            val FilteredCDR = sqlContext.sql("select p.*, q.*  " +
              " from MSCCDRFiltered p left join my_cgi_list q " +
              "on p.CGI=q.CGI ")

            println("======================print result =================")
            FilteredCDR.show() 
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
b4qexyjb

b4qexyjb1#

我使用以下方法成功地向hive写入了一些内容:

dataFrame
   .coalesce(n)
   .write
   .format("orc")
   .options(Map("path" -> savePath))
   .mode(SaveMode.Append)
   .saveAsTable(fullTableName)

我们使用分区的尝试没有得到贯彻,因为我认为我们想要的分区列有一些问题。
唯一的限制是并发写操作,如果表还不存在,那么任何试图创建表的任务(因为它在第一次尝试写入表时不存在)都将异常退出。
请注意,在流式应用程序中写入配置单元通常是糟糕的设计,因为您经常会写入许多小文件,这对于读取和存储来说非常低效。因此,如果要比每小时左右更频繁地写入hive,则应该确保包含压缩逻辑,或者添加更适合事务数据的中间存储层。

相关问题