java—将javapairdd< string,map< string,list< string>>存储到多个文件中

mzsu5hc0  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(326)

我正在使用spark 1.6并试图解决以下问题。我有一个 JavaPairRDD<String, Map<String, List<String>> . 我想将其存储为多个输出文件,基于javapairdd的键是外部目录,map的键是文件名。
例如,如果javapairdd具有以下数据

<"A", <{"A1",["a1","b1","c1"]}, {"A2",["a2","b2","c2"]}>>
<"B", <{"B1",["bb1","bb2","bb3"]}>

那么输出文件夹应该如下所示

/output/A/A1 (content of A1 should have [a1,b1,c1])
/output/A/A2 (content of A2 should have [a2,b2,c2])
/output/B/B1 (content of B1 should have [bb1,bb2,bb3])

我有下面的代码,但我不确定如何更改multipletextoutputformat以遍历值Map。

public static void main(String a[]) {
            JavaPairRDD<String, Map<String, List<String>> pair;
            pair.saveAsHadoopFile(directory + "/output", String.class, Map.class,
                            RDDMultipleTextOutputFormat.class);
        }

public static class RDDMultipleTextOutputFormat<A, B> extends   MultipleTextOutputFormat<A, B> {
        @Override
        protected String generateFileNameForKeyValue(A key, B value, String name) {
            return key.toString(); // + "/" + name;
        }

        @Override
        protected B generateActualValue(A key, B value) {
            //return value;
            Map<String, List<String>> map = (HashMap<String, List<String>>)value;
            for(Map.Entry<String, List<String>>entry: map.entrySet()) {
                generateFileNameForKeyValue((A)(key.toString() + "/" + entry.getKey()), (B)(entry.getValue().toString()), entry.getKey());

            }

            //return value.saveAsHadoopFile((Map)value., String.class, Map.class,
            //  RDDMultipleTextOutputFormat.class);
       }

       @Override
           protected A generateActualKey(A key, B value) {
               return null;
           }

       /*@Override
           public RecordWriter<A, B> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable prog) throws IOException {
               if (name.startsWith("apple")) {
                   return new TextOutputFormat<A, B>().getRecordWriter(fs, job, name, prog);
               } else if (name.startsWith("banana")) {
                   return new TextOutputFormat<A, B>().getRecordWriter(fs, job, name, prog);
               }
               return super.getRecordWriter(fs, job, name, prog);
           }*/
      }

非常感谢您的帮助。
谢谢阿基拉。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题