本文整理了Java中org.apache.spark.sql.DataFrame
类的一些代码示例,展示了DataFrame
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataFrame
类的具体详情如下:
包路径:org.apache.spark.sql.DataFrame
类名称:DataFrame
暂无
代码示例来源:origin: databricks/learning-spark
SparkConf conf = new SparkConf();
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlCtx = new SQLContext(sc);
DataFrame input = sqlCtx.jsonFile(inputFile);
input.printSchema();
input.registerTempTable("tweets");
DataFrame topTweets = sqlCtx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10");
Row[] result = topTweets.collect();
for (Row row : result) {
System.out.println(row.get(0));
JavaRDD<String> topTweetText = topTweets.toJavaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return row.getString(0);
JavaRDD<HappyPerson> happyPeopleRDD = sc.parallelize(peopleList);
DataFrame happyPeopleSchemaRDD = sqlCtx.applySchema(happyPeopleRDD, HappyPerson.class);
happyPeopleSchemaRDD.registerTempTable("happy_people");
sqlCtx.udf().register("stringLengthJava", new UDF1<String, Integer>() {
@Override
DataFrame tweetLength = sqlCtx.sql("SELECT stringLengthJava('text') FROM tweets LIMIT 10");
Row[] lengths = tweetLength.collect();
for (Row row : result) {
System.out.println(row.get(0));
代码示例来源:origin: Impetus/Kundera
@Override
public void saveDataFrame(DataFrame dataFrame, Class<?> entityClazz, Map<String, Object> properties)
{
dataFrame.sqlContext().sql("use " + (String) properties.get(KEYSPACE));
dataFrame.write().insertInto((String) properties.get(TABLE));
}
}
代码示例来源:origin: Impetus/Kundera
/**
* Gets the data frame size.
*
* @param dataFrame
* the data frame
* @return the data frame size
*/
public int getDataFrameSize(DataFrame dataFrame)
{
long l = dataFrame != null ? dataFrame.count() : 0;
if (l < Integer.MIN_VALUE || l > Integer.MAX_VALUE)
{
logger.error(l + " cannot be cast to int without changing its value.");
return 0;
}
return (int) l;
}
代码示例来源:origin: phuonglh/vn.vitk
String classifierFileName, int numHiddenUnits) {
this.sqlContext = new SQLContext(jsc);
trainingData.cache();
trainingData.show(false);
trainingData.registerTempTable("dfTable");
Row row = sqlContext.sql("SELECT MAX(label) as maxValue from dfTable").first();
int numLabels = (int)row.getDouble(0);
numLabels++;
DataFrame predictionAndLabel = result.select("prediction", "label");
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator().setMetricName("precision");
if (verbose) {
System.out.println("N = " + trainingData.count());
System.out.println("D = " + vocabSize);
System.out.println("K = " + numLabels);
代码示例来源:origin: phuonglh/vn.vitk
MarkovOrder order = MarkovOrder.values()[(Integer)params.getOrDefault(params.getMarkovOrder())-1];
ContextExtractor contextExtractor = new ContextExtractor(order, Constants.REGEXP_FILE);
JavaRDD<LabeledContext> contexts = contextExtractor.extract(dataset.javaRDD());
DataFrame dataFrame = dataset.sqlContext().createDataFrame(contexts, LabeledContext.class);
JavaRDD<Row> wt = df.select("word", "label").javaRDD();
JavaPairRDD<String, Set<Integer>> tagDictionary = wt.mapToPair(new PairFunction<Row, String, Set<Integer>>(){
private static final long serialVersionUID = 5865372074294028547L;
df.registerTempTable("dft");
Row row = df.sqlContext().sql("SELECT MAX(label) as maxValue FROM dft").first();
this.numLabels = (int)row.getDouble(0) + 1;
JavaRDD<Row> rows = df.sqlContext().sql("SELECT label, features FROM dft").toJavaRDD();
代码示例来源:origin: KeithSSmith/spark-compaction
public void compact(String inputPath, String outputPath) throws IOException {
this.setCompressionAndSerializationOptions(inputPath, outputPath);
this.outputCompressionProperties(this.outputCompression);
// Defining Spark Context with a generic Spark Configuration.
SparkConf sparkConf = new SparkConf().setAppName("Spark Compaction");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
if (this.outputSerialization.equals(TEXT)) {
JavaRDD<String> textFile = sc.textFile(this.concatInputPath(inputPath));
textFile.coalesce(this.splitSize).saveAsTextFile(outputPath);
} else if (this.outputSerialization.equals(PARQUET)) {
SQLContext sqlContext = new SQLContext(sc);
DataFrame parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath));
parquetFile.coalesce(this.splitSize).write().parquet(outputPath);
} else if (this.outputSerialization.equals(AVRO)) {
// For this to work the files must end in .avro
// Another issue is that when using compression the compression codec extension is not being added to the file name.
SQLContext sqlContext = new SQLContext(sc);
DataFrame avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath));
avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath);
} else {
System.out.println("Did not match any serialization type: text, parquet, or avro. Recieved: " +
this.outputSerialization);
}
}
代码示例来源:origin: phuonglh/vn.vitk
DataFrame df0 = (new SQLContext(jsc)).createDataFrame(jrdd, WhitespaceContext.class);
DataFrame df1 = model.transform(df0);
prediction = jsc.broadcast(df1.select("prediction").collect());
if (df1.count() > 0) {
output = s.map(new WhitespaceClassificationFunction());
代码示例来源:origin: phuonglh/vn.vitk
new StructField("dependency", DataTypes.StringType, false, Metadata.empty())
});
SQLContext sqlContext = new SQLContext(jsc);
DataFrame df = sqlContext.createDataFrame(rows, schema);
df.select("dependency").write().text(outputFileName);
else
df.repartition(1).write().json(outputFileName);
代码示例来源:origin: phuonglh/vn.vitk
/**
* Tags a list of sequences and returns a list of tag sequences.
* @param sentences
* @return a list of tagged sequences.
*/
public List<String> tag(List<String> sentences) {
List<Row> rows = new LinkedList<Row>();
for (String sentence : sentences) {
rows.add(RowFactory.create(sentence));
}
StructType schema = new StructType(new StructField[]{
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
SQLContext sqlContext = new SQLContext(jsc);
DataFrame input = sqlContext.createDataFrame(rows, schema);
if (cmmModel != null) {
DataFrame output = cmmModel.transform(input).repartition(1);
return output.javaRDD().map(new RowToStringFunction(1)).collect();
} else {
System.err.println("Tagging model is null. You need to create or load a model first.");
return null;
}
}
代码示例来源:origin: sectong/SparkToParquet
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval());
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);
df.write().partitionBy("ipAddress", "method", "responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());
代码示例来源:origin: oeljeklaus-you/UserActionAnalyzePlatform
DataTypes.createStructField("pay_product_ids", DataTypes.StringType, true)));
DataFrame df = sqlContext.createDataFrame(rowsRDD, schema);
df.registerTempTable("user_visit_action");
for(Row _row : df.take(1)) {
System.out.println(_row);
DataTypes.createStructField("sex", DataTypes.StringType, true)));
DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2);
for(Row _row : df2.take(1)) {
System.out.println(_row);
df2.registerTempTable("user_info");
代码示例来源:origin: phuonglh/vn.vitk
/**
* Creates a n-gram data frame from text lines.
* @param lines
* @return a n-gram data frame.
*/
DataFrame createNGramDataFrame(JavaRDD<String> lines) {
JavaRDD<Row> rows = lines.map(new Function<String, Row>(){
private static final long serialVersionUID = -4332903997027358601L;
@Override
public Row call(String line) throws Exception {
return RowFactory.create(Arrays.asList(line.split("\\s+")));
}
});
StructType schema = new StructType(new StructField[] {
new StructField("words",
DataTypes.createArrayType(DataTypes.StringType), false,
Metadata.empty()) });
DataFrame wordDF = new SQLContext(jsc).createDataFrame(rows, schema);
// build a bigram language model
NGram transformer = new NGram().setInputCol("words")
.setOutputCol("ngrams").setN(2);
DataFrame ngramDF = transformer.transform(wordDF);
ngramDF.show(10, false);
return ngramDF;
}
代码示例来源:origin: oeljeklaus-you/UserActionAnalyzePlatform
/**
* 获取指定日期范围内的数据
* @param sc
* @param taskParam
* @return
*/
private static JavaRDD<Row> getActionRDD(SQLContext sc, JSONObject taskParam)
{
String startTime=ParamUtils.getParam(taskParam,Constants.PARAM_STARTTIME);
String endTime=ParamUtils.getParam(taskParam,Constants.PARAM_ENDTIME);
String sql="select *from user_visit_action where date>='"+startTime+"' and date<='"+endTime+"'";
DataFrame df=sc.sql(sql);
return df.javaRDD();
}
代码示例来源:origin: stackoverflow.com
SQLContext sqlcontext=new SQLContext(context);
DataFrame outDataFrame=sqlcontext.createDataFrame(finalOutPutRDD, WebHttpOutPutVO.class);
Properties prop = new java.util.Properties();
prop.setProperty("database", "Web_Session");
prop.setProperty("user", "user");
prop.setProperty("password", "pwd@123");
prop.setProperty("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver");
outDataFrame.write().mode(org.apache.spark.sql.SaveMode.Append).jdbc("jdbc:sqlserver://<Host>:1433", "test_table", prop);
代码示例来源:origin: com.cloudera.livy/livy-test-lib
@Override
public List<String> call(JobContext jc) throws Exception {
InputStream source = getClass().getResourceAsStream("/testweet.json");
// Save the resource as a file in HDFS (or the local tmp dir when using a local filesystem).
URI input;
File local = File.createTempFile("tweets", ".json", jc.getLocalTmpDir());
Files.copy(source, local.toPath(), StandardCopyOption.REPLACE_EXISTING);
FileSystem fs = FileSystem.get(jc.sc().sc().hadoopConfiguration());
if ("file".equals(fs.getUri().getScheme())) {
input = local.toURI();
} else {
String uuid = UUID.randomUUID().toString();
Path target = new Path("/tmp/" + uuid + "-tweets.json");
fs.copyFromLocalFile(new Path(local.toURI()), target);
input = target.toUri();
}
SQLContext sqlctx = useHiveContext ? jc.hivectx() : jc.sqlctx();
sqlctx.jsonFile(input.toString()).registerTempTable("tweets");
List<String> tweetList = new ArrayList<>();
Row[] result =
(Row[])(sqlctx.sql("SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
.collect());
for (Row r : result) {
tweetList.add(r.toString());
}
return tweetList;
}
代码示例来源:origin: Impetus/Kundera
/**
* Register table for json.
*
* @param tableName
* the table name
* @param dataSourcePath
* the data source path
* @param sqlContext
* the sql context
*/
private void registerTableForJson(String tableName, String dataSourcePath, HiveContext sqlContext)
{
sqlContext.jsonFile(dataSourcePath).registerTempTable(tableName);
}
代码示例来源:origin: phuonglh/vn.vitk
DataFrame df = sqlContext.createDataFrame(jrdd, WhitespaceContext.class);
df.show(false);
System.out.println("N = " + df.count());
df.groupBy("label").count().show();
predictions.show();
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator().setMetricName("precision");
double accuracy = evaluator.evaluate(predictions);
代码示例来源:origin: phuonglh/vn.vitk
@Override
public DataFrame transform(DataFrame dataset) {
JavaRDD<Row> output = dataset.javaRDD().map(new DecodeFunction());
StructType schema = new StructType(new StructField[]{
new StructField("sentence", DataTypes.StringType, false, Metadata.empty()),
new StructField("prediction", DataTypes.StringType, false, Metadata.empty())
});
return dataset.sqlContext().createDataFrame(output, schema);
}
代码示例来源:origin: stackoverflow.com
dataForFirstDF.add(RowFactory.create("a/b/c/d/e", "abc"));
dataForFirstDF.add(RowFactory.create("x/y/z", "xyz"));
DataFrame df1 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForFirstDF), schema);
df1.show();
dataForSecondDF.add(RowFactory.create("a/a/a", "qwe"));
dataForSecondDF.add(RowFactory.create("a/b/c", "qwe"));
DataFrame df2 = sqlContext.createDataFrame(javaSparkContext.parallelize(dataForSecondDF), schema);
DataFrame result = df1.as("firstDF").join(df2.as("secondDF"), pathContains, "leftsemi");
result.show();
代码示例来源:origin: Quetzal-RDF/quetzal
public static void main( String[] args )
{
// SparkConf conf = new SparkConf().setAppName("App-mt").setMaster("local[2]");
// SparkConf conf = new SparkConf().setAppName("App-mt").setMaster("spark://Kavithas-MBP.home:7077");
SparkConf conf = new SparkConf().setAppName("App-mt").setMaster("spark://kavithas-mbp.watson.ibm.com:7077");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext sqlContext = new HiveContext(sc.sc());
DataFrame urls = sqlContext.read().json("/tmp/urls.json");
urls.registerTempTable("urls");
DataFrame temp = sqlContext.sql("select * from urls");
temp.show();
sqlContext.sql("add jar /tmp/quetzal.jar");
sqlContext.sql("create temporary function webservice as 'com.ibm.research.rdf.store.utilities.WebServiceGetUDTF'");
DataFrame drugs = sqlContext.sql("select webservice(\"drug,id,action\", \"url\", \"\", \"GET\", \"xs=http://www.w3.org/2001/XMLSchema\", \"//row\",\"drug\",\"./drug\","
+ " \"<string>\", \"id\", \"./id\",\"<string>\", \"action\", \"./action\", \"<string>\", url) as (drug, drug_typ, id, id_typ, action, action_typ) from urls");
drugs.show();
System.out.println("Num rows:" + drugs.count());
}
内容来源于网络,如有侵权,请联系作者删除!