spark javapairdd迭代

zdwk9cvp  于 2021-06-21  发布在  Pig
关注(0)|答案(3)|浏览(279)

如何在javapairdd上迭代。我做了一个group by,得到了一个rdd,如下javapairdd(tuple7字符串集和对象列表)
现在我必须迭代这个rdd,并对pig中的每个rdd进行类似的计算。基本上我想迭代键和值列表并执行一些操作,然后返回一个javapairdd?

JavaPairRDD<Tuple7<String, String,String,String,String,String,String>, List<Records>> sizes =     
piTagRecordData.groupBy( new Function<Records, Tuple7<String, String,String,String,String,String,String>>() {
        private static final long serialVersionUID = 2885738359644652208L;
        @Override
        public Tuple7<String, String,String,String,String,String,String> call(Records row) throws Exception {
            Tuple7<String, String,String,String,String,String,String> compositeKey = new Tuple7<String, String, String, String, String, String, String>(row.getAsset_attribute_id(),row.getDate_time_value(),row.getOperation(),row.getPi_tag_count(),row.getAsset_id(),row.getAttr_name(),row.getCalculation_type());  
            return compositeKey;
        }
    });

在这之后,我想为大小(javapairdd)的每个成员执行一个操作——比如

rejected_records = FOREACH sizes GENERATE FLATTEN(Java function on the List of Records based on the group key

我正在使用spark 0.9.0

e4yzc0pl

e4yzc0pl1#

你可以用 void foreach(VoidFunction<T> f) 方法。更多信息和方法:https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/api/java/javarddlike.html#foreach(org.apache.spark.api.java.function.voidfunction)

rwqw0loc

rwqw0loc2#

尽管你说的是“为每个人”,但听起来你真的想要 flatMap 操作,因为您希望生成新值并将其展平。这可用于JavaRDD,包括 JavaPairRDD .

3vpjnl9f

3vpjnl9f3#

如果您想查看javapairdd的一些值,我会这样做

for (Tuple2<String, String> test : pairRdd.take(10)) //or pairRdd.collect()
           {
               System.out.println(test._1);
               System.out.println(test._2);
          }

note:tuple2 (假设javapairdd中有字符串),根据存储在javapairdd中的数据类型更改数据类型。

相关问题