使用apachespark将mongodb数据保存为parquet文件格式

ttisahbt  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(513)

我是apachespark和scala编程语言的新手。
我试图实现的是从本地mongodb数据库中提取数据,然后使用apachespark和hadoop连接器将其保存为Parquet格式
这是我目前的代码:

package com.examples 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.rdd.RDD 
import org.apache.hadoop.conf.Configuration 
import org.bson.BSONObject 
import com.mongodb.hadoop.{MongoInputFormat, BSONFileInputFormat} 
import org.apache.spark.sql 
import org.apache.spark.sql.SQLContext 

object DataMigrator { 

    def main(args: Array[String])
    { 
        val conf = new SparkConf().setAppName("Migration    App").setMaster("local") 
        val sc = new SparkContext(conf) 
        val sqlContext = new SQLContext(sc) 

        // Import statement to implicitly convert an RDD to a DataFrame 
        import sqlContext.implicits._ 

        val mongoConfig = new Configuration() 
        mongoConfig.set("mongo.input.uri",   "mongodb://localhost:27017/mongosails4.case") 

        val mongoRDD = sc.newAPIHadoopRDD(mongoConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]);     

        val count = countsRDD.count()

        // the count value is aprox 100,000 
        println("================ PRINTING =====================") 
        println(s"ROW COUNT IS $count") 
        println("================ PRINTING =====================") 
    } 
}

问题是,为了将数据保存为Parquet文件格式,首先需要将mongordd变量转换为spark dataframe。我试过这样的方法:

// convert RDD to DataFrame
val myDf = mongoRDD.toDF()  // this lines throws an error
myDF.write.save("my/path/myData.parquet")

我得到的错误是: Exception in thread "main" scala.MatchError: java.lang.Object (of class scala.reflect.internal.Types.$TypeRef$$anon$6) 你们还有其他想法吗?我怎样才能把rdd转换成Dataframe,这样我就可以用Parquet格式保存数据了?
以下是mongodb集合中一个文档的结构:https://gist.github.com/kingtrocko/83a94238304c2d654fe4

7fyelxc5

7fyelxc51#

创建一个case类,表示存储在dbobject中的数据。 case class Data(x: Int, s: String) 然后,将rdd的值Map到case类的示例。 val dataRDD = mongoRDD.values.map { obj => Data(obj.get("x"), obj.get("s")) } 现在使用rdd[data],您可以使用sqlcontext创建一个dataframe val myDF = sqlContext.createDataFrame(dataRDD) 那会让你走的。如果需要的话,我可以稍后再解释。

相关问题