spark rdd take()和序列文件

qyswt5oh  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(455)

看起来像 RDD.take() 只要在sequencefile的支持下重复读取最后一个元素。
例如:

val rdd = sc.sequenceFile("records.seq", classOf[LongWritable], classOf[RecordWritable])
val records: Array[(LongWritable, RecordWritable)] = rdd.take(5)
System.out.println(records.map(_._2.toString).mkString("\n"))

输出:

Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)
Record(3.1, 2.5)

尽管我知道这些行是独一无二的。
这个问题也存在于 sc.binaryRecords() .
我意识到这可能与hadoop的可写缓存问题有关,但是有计划解决这个问题吗?有什么解决办法吗?

4xrmg8kj

4xrmg8kj1#

我尝试复制您的问题,是的,我也看到了类似的行为,当直接调用sc.sequencefile()的结果时。但却能找到工作:
注意:我解释使用longwritable和text而不是recordwritable。我不确定是否需要导入可写文件
我的序列文件有: (0,0) (1,1) (2,2) ... ```
val rdd = sc.sequenceFile("sequencefile.seq", classOf[LongWritable], classOf[Text])
val map = rdd.map(case (k,v) => (k.get(),v.toString()))
map.take(1);
res5: Array[(Long, String)] = Array((0,0))
map.take(5);
res4: Array[(Long, String)] = Array((0,0), (1,1), (2,2), (3,3), (4,4))

相关问题