side-join中的java-bloom过滤器

fgw7neuy  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(381)

我正在探索布卢姆过滤器。我浏览了大多数BloomFitlers的博客,知道什么是,但仍然不能找出一个例子,以防加入。
每篇文章都说它会减少网络i/o,但没有一篇文章说明如何减少网络i/o?特别的一个很好http://vanjakom.wordpress.com/tag/distributed-cache/ 但它似乎和我刚开始使用map reduce一样复杂。
有谁能帮我在下面的例子中实现bloomfilter(reduceside-join)吗
2个Map器读取用户记录和部门记录并加入
用户记录
身份证,姓名
3738,里奇·戈尔
12946,罗尼·萨姆
17556年,大卫加特
3443,雷切尔·史密斯
5799,保罗·罗斯塔
部门记录
3738,销售
12946,市场营销
17556,市场营销
3738,销售
3738,销售
代码

public class UserMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{

 private Text outkey = new Text();
 private Text outval = new Text();
 private String id, name;

public void map (LongWritable key, Text value, OutputCollector<Text, Text> ouput,Reporter reporter)
             throws IOException {

     String line = value.toString();
     String arryUsers[] = line.split(",");
     id = arryUsers[0].trim();
     name = arryUsers[1].trim();

     outkey.set(id);
     outval.set("A"+ name);
     ouput.collect(outkey, outval);
   }
    }

public class DepartMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

private Text Outk = new Text();
private Text Outv = new Text();
String depid, dep ;

public void map (LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

    String line = value.toString();
    String arryDept[] = line.split(",");
    depid = arryDept[0].trim();
    dep = arryDept[1].trim();

    Outk.set(depid);
    Outv.set("B" + dep);

    output.collect(Outk, Outv);
}
    }

减速器

ublic class JoinReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

private Text tmp = new Text();
private ArrayList<Text> listA = new ArrayList<Text>();
private ArrayList<Text> listB = new ArrayList<Text>();

public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException {

    listA.clear();
    listB.clear();

    while (values.hasNext()) {

        tmp = values.next();
        if (tmp.charAt(0) == 'A') {
            listA.add(new Text(tmp.toString().substring(1)));
        } else if (tmp.charAt(0) == 'B') {
            listB.add(new Text(tmp.toString().substring(1)));
        }

    }
    executejoinlogic(output);

}

private void executejoinlogic(OutputCollector<Text, Text> output) throws IOException {

    if (!listA.isEmpty() && !listB.isEmpty()) {
        for (Text A : listA) {
        for (Text B : listB) {
        output.collect(A, B);
        }
        }
         }
    }
          }

在上述场景中是否可以实现bloomfilter?
如果是,那么请帮助我实现这一点?

e0bqpujr

e0bqpujr1#

只有当两个输入表中的一个比另一个小得多时,才能在这里实现bloom过滤器。您需要遵循的流程如下:
在中初始化bloom过滤器 setup() Map器类的方法(筛选器对象本身应该是全局的,以便 map() 方法(稍后): filter = new BloomFilter(VECTOR_SIZE,NB_HASH,HASH_TYPE); 把较小的表格读入表格 setup() Map器的方法。
将每个记录的id添加到bloom筛选器: filter.add(ID);map() 方法本身,使用 filter.membershipTest(ID) 在任何来自较大输入源的ID上。如果没有匹配项,您就知道id不在较小的数据集中,因此不应该传递给reducer。
记住,在减速机中会出现误报,所以不要假设所有的东西都会连接起来。

相关问题