我有以下代码:
public class IPCCodes {
public static class IPCCount implements Serializable {
public IPCCount(long permid, int year, int count, String ipc) {
this.permid = permid;
this.year = year;
this.count = count;
this.ipc = ipc;
}
public long permid;
public int year;
public int count;
public String ipc;
}
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("IPC codes");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
DataFrame df = sqlContext.sql("SELECT * FROM test.some_table WHERE year>2004");
JavaRDD<Row> rdd = df.javaRDD();
JavaRDD<IPCCount> map = rdd.flatMap(new FlatMapFunction<Row, IPCCount>() {
@Override
public Iterable<IPCCount> call(Row row) throws Exception {
List<IPCCount> counts = new ArrayList<>();
try {
String codes = row.getString(7);
for (String s : codes.split(",")) {
if(s.length()>4){
counts.add(new IPCCount(row.getLong(4), row.getInt(6), 1, s.substring(0, 4)));
}
}
} catch (NumberFormatException e) {
System.out.println(e.getMessage());
}
return counts;
}
});
我从配置单元表创建了dataframe,并应用flatmap函数来拆分ipc代码(该字段是配置单元表中的字符串数组),然后我需要使用count per permid和year聚合代码,结果表应该是permid/year/ipc/count。
最有效的方法是什么?
1条答案
按热度按时间7bsow1i61#
如果你想要一个
DataFrame
作为输出,没有充分的理由使用RDD
以及flatMap
. 据我所知,使用基本的sparksql函数,一切都可以很容易地处理。使用scala: