我目前正试图用java编写一个spark作业来计算数据集中一列的积分。
数据如下所示:
DateTime velocity (in km/h) vehicle
2016-03-28 11:00:45 80 A
2016-03-28 11:00:45 75 A
2016-03-28 11:00:46 70 A
2016-03-28 11:00:47 68 A
2016-03-28 11:00:48 72 A
2016-03-28 11:00:48 75 A
...
2016-03-28 11:00:47 68 B
2016-03-28 11:00:48 72 B
2016-03-28 11:00:48 75 B
为了计算每条线路的距离(以公里为单位),我必须定义当前线路和下一条线路之间的时间差,并将其与速度相乘。然后,必须将结果添加到前一行的结果中,以检索此时行驶的“总距离”。
我现在想出了这样的办法。但它会计算出每个Map作业一辆车,可能会有数百万条记录。。。。
final JavaRDD<String[]> input = sc.parallelize(Arrays.asList(
new String[]{"2016-03-28", "11:00", "80", "VIN1"},
new String[]{"2016-03-28", "11:00", "60", "VIN1"},
new String[]{"2016-03-28", "11:00", "50", "VIN1"},
new String[]{"2016-03-28", "11:01", "80", "VIN1"},
new String[]{"2016-03-28", "11:05", "80", "VIN1"},
new String[]{"2016-03-28", "11:09", "80", "VIN1"},
new String[]{"2016-03-28", "11:00", "80", "VIN2"},
new String[]{"2016-03-28", "11:01", "80", "VIN2"}
));
// grouping by vehicle and date:
final JavaPairRDD<String, Iterable<String[]>> byVinAndDate = input.groupBy(new Function<String[], String>() {
@Override
public String call(String[] record) throws Exception {
return record[0] + record[3]; // date, vin
}
});
// mapping each "value" (all record matching key) to result
final JavaRDD<String[]> result = byVinAndDate.mapValues(new Function<Iterable<String[]>, String[]>() {
@Override
public String[] call(Iterable<String[]> records) throws Exception {
final Iterator<String[]> iterator = records.iterator();
String[] previousRecord = iterator.next();
for (String[] record : records) {
// Calculate difference current <-> previous record
// Add result to new list
previousRecord = record;
}
return new String[]{
previousRecord[0],
previousRecord[1],
previousRecord[2],
previousRecord[3],
NewList.get(previousRecord[0]+previousRecord[1]+previousRecord[2]+previousRecord[2])
};
}
}).values();
我完全不知道如何将这个问题转化为map/reduce转换,同时又不失去分布式计算的好处。
我知道这违背了mr和spark的本质,但是任何关于如何链接数据行或以优雅的方式解决这个问题的建议都会非常有用:)
谢谢!
20条答案
按热度按时间nwnhqdif16#
+----+-----+
bvpmtnay17#
+----+ | data| id| time| avg_velocity|rank| +
raogr8fs18#
--------+ |2016-03-28|VIN1|11.722222222222223| |2016-03-28|VIN2|1.3333333333333335| +
yiytaume19#
+----+-----+
xj3cbfub20#
+----+` 使用dataframe analytics api,基于对数据和id列的分区计算时间帧,保留这些时间帧之间的秒数
它将输出: `+