spark-为一个具有多列的行键创建hfile

g52tjvyc  于 2021-06-08  发布在  Hbase
关注(0)|答案(2)|浏览(401)
JavaRDD<String> hbaseFile = jsc.textFile(HDFS_MASTER+HBASE_FILE);
JavaPairRDD<ImmutableBytesWritable, KeyValue> putJavaRDD = hbaseFile.mapToPair(line -> convertToKVCol1(line, COLUMN_AGE));
putJavaRDD.sortByKey(true);
putJavaRDD.saveAsNewAPIHadoopFile(stagingFolder, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);

private static Tuple2<ImmutableBytesWritable, KeyValue> convertToKVCol1(String beanString, byte[] column) {
    InspurUserEntity inspurUserEntity = gson.fromJson(beanString, InspurUserEntity.class);
    String rowKey = inspurUserEntity.getDepartment_level1()+"_"+inspurUserEntity.getDepartment_level2()+"_"+inspurUserEntity.getId();
    return new Tuple2<>(new ImmutableBytesWritable(Bytes.toBytes(rowKey)),
            new KeyValue(Bytes.toBytes(rowKey), COLUMN_FAMILY, column, Bytes.toBytes(inspurUserEntity.getAge())));
}

上面是我的代码,它只适用于行键的单个列。有没有办法为一个行键创建一个包含多个列的hfile?

eoxn13cs

eoxn13cs1#

您可以创建多个 Tuple2<ImmutableBytesWritable, KeyValue> 对于一行,其中键保持不变 KeyValue s表示单个单元格值。确保你的专栏也按字典顺序排列。所以你应该调用 saveAsNewAPIHadoopFileJavaPairRDD<ImmutableBytesWritable, KeyValue> .

final JavaPairRDD<ImmutableBytesWritable, KeyValue> writables = myRdd.flatMapToPair(record -> {
     final List<Tuple2<ImmutableBytesWritable, KeyValue>> listToReturn = new ArrayList<>();
     // Add first column to the collection
     listToReturn.add(new Tuple2<ImmutableBytesWritable, KeyValue>(
                            new ImmutableBytesWritable(Bytes.toBytes(record.getRowKey())),
                            new KeyValue(Bytes.toBytes(record.getRowKey()), Bytes.toBytes("CF"),
                                    Bytes.toBytes("COL1"), System.currentTimeMillis(),
                                    Bytes.toBytes(record.getCol1()))));
    // Add subsequent columns
    listToReturn.add(new Tuple2<ImmutableBytesWritable, KeyValue>(
                            new ImmutableBytesWritable(Bytes.toBytes(record.getRowKey())),
                            new KeyValue(Bytes.toBytes(record.getRowKey()), Bytes.toBytes("CF"),
                                    Bytes.toBytes("COL2"), System.currentTimeMillis(),
                                    Bytes.toBytes(record.getCol2()))));
});

注意:这是一个主要的问题,您必须添加您的列到rdd词典以及。
从本质上说,这种组合:行键+列族+列限定符应该在处理推出hfiles之前进行排序。

z0qdvdin

z0qdvdin2#

必须在声明中使用数组而不是immutablebyteswritable。

相关问题