将键值数据库与spark集成

rnmwe5a2  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(397)

我很难理解spark如何与存储交互。
我想制作一个spark集群,从rocksdb数据库(或任何其他键值存储)获取数据。然而,此时此刻,我能做的最好的事情就是将数据库中的整个数据集提取到每个集群节点的内存中(例如,Map到一个Map中),并从该对象构建一个rdd。
我要怎么做才能只获取必要的数据(就像spark对hdfs所做的那样)?我读过hadoop输入格式和记录阅读器,但我还没有完全掌握应该实现什么。
我知道这是一个广泛的问题,但我真的很感谢一些帮助我开始。先谢谢你。

brgchamk

brgchamk1#

这里有一个可能的解决办法。我假设您有要访问的键值存储(在您的例子中是rocksdb)的客户机库。 KeyValuePair 表示一个bean类,它表示键值存储中的一个键值对。
班级

/*Lazy iterator to read from KeyValue store*/
class KeyValueIterator implements Iterator<KeyValuePair> {
    public KeyValueIterator() {
        //TODO initialize your custom reader using java client library
    }
    @Override
    public boolean hasNext() {
        //TODO
    }

    @Override
    public KeyValuePair next() {
        //TODO
    }
}
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() {
    @Override
    public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception {
        //ignore empty 'keyValuePair' object
        return new KeyValueIterator();
    }
}

创建键值rdd

/*list with a dummy KeyValuePair instance*/
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>();
keyValuePairs.add(new KeyValuePair());
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs);
/*Read one key-value pair at a time lazily*/    
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader());

注:
上面的解决方案创建了一个rdd,默认情况下有两个分区(其中一个是空的)。在应用任何转换之前增加分区 keyValuePairRDD 在执行者之间分配处理。增加分区的不同方法:

keyValuePairRDD.repartition(partitionCounts)
//OR
keyValuePairRDD.partitionBy(...)

相关问题