我是spark的新手,遇到了一个问题。我正在处理一个用textFile()生成的RDD,它是一个csv文件。对于每一行,我想返回多行到一个新的RDD(一个而不是多个)。这是我的代码:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}).cache();
我在这里所做的是过滤初始csv以仅获得LinearAccelerationEvent,然后我想将这些对象Map到LinearAccelerationEvent类并生成一个新的LinearAccelerationEvent对象的RDD。对于初始csv文件的每一行,我必须生成多个LinearAccelerometerEvent对象,但我不知道如何操作。我之所以要这么做,是因为以后这个RDD会这样推送给cassandra:
javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();
因此,理想的解决方案应该是:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
for() {
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}
}).cache();
我可以使用foreachPartition()
函数并将for循环的每个事件推送到Cassandra,但我发现这种方法要慢得多。是否可以不使用foreach来做我想做的事情?谢谢
1条答案
按热度按时间osh3o9ms1#
如果我没理解错的话,返回一个
LinearAccelerationEvent
的集合(例如List)并调用flatMap
而不是map
,这将在每个加速事件的结果RDD中产生一个值。flatMap与调用map后再调用flatten是一样的。如果你熟悉Hive,它类似于使用HiveQL中的explode DTF。