我正在使用spark 1.6并试图解决以下问题。我有一个 JavaPairRDD<String, Map<String, List<String>>
. 我想将其存储为多个输出文件,基于javapairdd的键是外部目录,map的键是文件名。
例如,如果javapairdd具有以下数据
<"A", <{"A1",["a1","b1","c1"]}, {"A2",["a2","b2","c2"]}>>
<"B", <{"B1",["bb1","bb2","bb3"]}>
那么输出文件夹应该如下所示
/output/A/A1 (content of A1 should have [a1,b1,c1])
/output/A/A2 (content of A2 should have [a2,b2,c2])
/output/B/B1 (content of B1 should have [bb1,bb2,bb3])
我有下面的代码,但我不确定如何更改multipletextoutputformat以遍历值Map。
public static void main(String a[]) {
JavaPairRDD<String, Map<String, List<String>> pair;
pair.saveAsHadoopFile(directory + "/output", String.class, Map.class,
RDDMultipleTextOutputFormat.class);
}
public static class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
@Override
protected String generateFileNameForKeyValue(A key, B value, String name) {
return key.toString(); // + "/" + name;
}
@Override
protected B generateActualValue(A key, B value) {
//return value;
Map<String, List<String>> map = (HashMap<String, List<String>>)value;
for(Map.Entry<String, List<String>>entry: map.entrySet()) {
generateFileNameForKeyValue((A)(key.toString() + "/" + entry.getKey()), (B)(entry.getValue().toString()), entry.getKey());
}
//return value.saveAsHadoopFile((Map)value., String.class, Map.class,
// RDDMultipleTextOutputFormat.class);
}
@Override
protected A generateActualKey(A key, B value) {
return null;
}
/*@Override
public RecordWriter<A, B> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable prog) throws IOException {
if (name.startsWith("apple")) {
return new TextOutputFormat<A, B>().getRecordWriter(fs, job, name, prog);
} else if (name.startsWith("banana")) {
return new TextOutputFormat<A, B>().getRecordWriter(fs, job, name, prog);
}
return super.getRecordWriter(fs, job, name, prog);
}*/
}
非常感谢您的帮助。
谢谢阿基拉。
暂无答案!
目前还没有任何答案,快来回答吧!