如何在javapairdd上迭代。我做了一个group by,得到了一个rdd,如下javapairdd(tuple7字符串集和对象列表)
现在我必须迭代这个rdd,并对pig中的每个rdd进行类似的计算。基本上我想迭代键和值列表并执行一些操作,然后返回一个javapairdd?
JavaPairRDD<Tuple7<String, String,String,String,String,String,String>, List<Records>> sizes =
piTagRecordData.groupBy( new Function<Records, Tuple7<String, String,String,String,String,String,String>>() {
private static final long serialVersionUID = 2885738359644652208L;
@Override
public Tuple7<String, String,String,String,String,String,String> call(Records row) throws Exception {
Tuple7<String, String,String,String,String,String,String> compositeKey = new Tuple7<String, String, String, String, String, String, String>(row.getAsset_attribute_id(),row.getDate_time_value(),row.getOperation(),row.getPi_tag_count(),row.getAsset_id(),row.getAttr_name(),row.getCalculation_type());
return compositeKey;
}
});
在这之后,我想为大小(javapairdd)的每个成员执行一个操作——比如
rejected_records = FOREACH sizes GENERATE FLATTEN(Java function on the List of Records based on the group key
我正在使用spark 0.9.0
3条答案
按热度按时间e4yzc0pl1#
你可以用
void foreach(VoidFunction<T> f)
方法。更多信息和方法:https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/api/java/javarddlike.html#foreach(org.apache.spark.api.java.function.voidfunction)rwqw0loc2#
尽管你说的是“为每个人”,但听起来你真的想要
flatMap
操作,因为您希望生成新值并将其展平。这可用于JavaRDD,包括JavaPairRDD
.3vpjnl9f3#
如果您想查看javapairdd的一些值,我会这样做
note:tuple2 (假设javapairdd中有字符串),根据存储在javapairdd中的数据类型更改数据类型。