我有一个csv文件..我使用sql上下文将其加载到程序中,并将其上载到Dataframe中。现在我想将此csv文件存储到mongodbcollection中。但我无法将其转换为javapairedrdd。请帮助。。。
我的密码是。。。
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.bson.BSONObject;
import org.apache.spark.api.java.JavaPairRDD;
import com.mongodb.hadoop.MongoOutputFormat;
public class CSVReader {
public static void main(String args[]){
SparkConf conf = new SparkConf().setAppName("sparkConnection").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
/* To load a csv file frol given location*/
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")//Automaticaaly infers the data
.option("header", "true")//To include the headers in dataframe
.load("D:/SparkFiles/abc.csv");
}
}
1条答案
按热度按时间rkue9o1l1#
你显然研究得不够。
因为如果你有,你就会知道dataframe是schema+rdd的组合。
假设您发布的代码工作正常,您可以从df读取rdd作为
df.rdd