如何生成hfiles,然后用spark批量加载到hbase中?

7d7tgy0s  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(262)

各位,我用spark实现hbase大容量负载的方法很多。例如opencore,scala load,但是,它们可以使用spark在本地主机上工作,而不是yarn。

Dataset<Row> sqlDF = spark.sql(sql);
JavaRDD<Row> rowJavaRDD = sqlDF.javaRDD();

JavaPairRDD<ImmutableBytesWritable, TreeSet<KeyValue>> pairRDD = rowJavaRDD.mapToPair(
            row->convertToKVs(row,fieldArray)//in method :TreeSet<KeyValue> kvSet = new TreeSet<>(KeyValue.COMPARATOR);
    );

pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(1));

JavaPairRDD<ImmutableBytesWritable, KeyValue> cellRDD = pairRDD.flatMapToPair(row ->{
        List<Tuple2<ImmutableBytesWritable, KeyValue>> kvs = new ArrayList<>();
        TreeSet<KeyValue> kvSet = row._2;
        for (KeyValue keyValue : kvSet) {
            kvs.add(new Tuple2<>(row._1,keyValue));
        }
        return kvs.iterator();
    });

... job conf ...

HFileOutputFormat2.configureIncrementalLoad(job, table);
cellRDD.saveAsNewAPIHadoopFile(hFilePath,ImmutableBytesWritable.class,KeyValue.class,HFileOutputFormat2.class,job.getConfiguration());

当我使用 spark-submit --master yarn 要执行代码,异常 Added a key not lexically larger than previous. key=... 是引发的,我知道异常是由表列没有按词汇排序引起的,但是为什么即使我使用了 new TreeSet<>(KeyValue.COMPARATOR) 对列进行排序。
有人能告诉我怎么修吗?提前谢谢。
编辑:原因是pairdd具有相同的rowkey结果。通过distcp,我实现了跨不同hbase集群的大负载分布。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题