各位,我用spark实现hbase大容量负载的方法很多。例如opencore,scala load,但是,它们可以使用spark在本地主机上工作,而不是yarn。
Dataset<Row> sqlDF = spark.sql(sql);
JavaRDD<Row> rowJavaRDD = sqlDF.javaRDD();
JavaPairRDD<ImmutableBytesWritable, TreeSet<KeyValue>> pairRDD = rowJavaRDD.mapToPair(
row->convertToKVs(row,fieldArray)//in method :TreeSet<KeyValue> kvSet = new TreeSet<>(KeyValue.COMPARATOR);
);
pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(1));
JavaPairRDD<ImmutableBytesWritable, KeyValue> cellRDD = pairRDD.flatMapToPair(row ->{
List<Tuple2<ImmutableBytesWritable, KeyValue>> kvs = new ArrayList<>();
TreeSet<KeyValue> kvSet = row._2;
for (KeyValue keyValue : kvSet) {
kvs.add(new Tuple2<>(row._1,keyValue));
}
return kvs.iterator();
});
... job conf ...
HFileOutputFormat2.configureIncrementalLoad(job, table);
cellRDD.saveAsNewAPIHadoopFile(hFilePath,ImmutableBytesWritable.class,KeyValue.class,HFileOutputFormat2.class,job.getConfiguration());
当我使用 spark-submit --master yarn
要执行代码,异常 Added a key not lexically larger than previous. key=...
是引发的,我知道异常是由表列没有按词汇排序引起的,但是为什么即使我使用了 new TreeSet<>(KeyValue.COMPARATOR)
对列进行排序。
有人能告诉我怎么修吗?提前谢谢。
编辑:原因是pairdd具有相同的rowkey结果。通过distcp,我实现了跨不同hbase集群的大负载分布。
暂无答案!
目前还没有任何答案,快来回答吧!