我有一个巨大的时间序列数据集。我想在数据上做reducebykey。我正在使用loadapi joinWithCassandraTable
然后做reducebykey。问题是spark将所有数据加载到内存中,然后执行reducebykey。我想减少内存,所以我试着加载小的(比如说24个样本中有4个),然后做reducebykey。然后加载后续数据并使用以前的rdd和do reducebykey。
示例代码:
def loadData(t1:Long, t2:Long): RDD[(String,Long)] =???
def ranges:List(Long,Long) = ???
ranges.aggregate(sc.emptyRDD[(String, Long)])(loadData, combineRDD)
val combineRDD: (RDD[(String, Long)],
RDD[(String, Long)]) =>
RDD[(String, Long)] = {
case (x, y) =>
x.union(y).reduceByKey(_+_)
}
但是上面的代码没有按预期工作。它创建了长的沿袭,并且仍然加载整个数据集。
尝试了此处提出的解决方案: sc.union
但这对记忆问题没有帮助。
引用:由于rdd沿袭较长而导致堆栈溢出
如何将rdd合并到先前创建的rdd中,然后执行reducebykey? EDIT:
我尝试使用localcheckpointapi,如这里所述。
由于rdd沿袭较长而导致堆栈溢出
但这需要很长时间。有没有更好的办法?
暂无答案!
目前还没有任何答案,快来回答吧!