多线程风暴螺栓并行

k0pti3hp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(421)

用于测试的组件包括
Kafka从机器上读取文件的传送器,文件由1000行组成。

String sCurrentLine;
        br = new BufferedReader(new FileReader("D:\\jsonLogTest.txt"));
        while ((sCurrentLine = br.readLine()) != null) {
            //System.out.println(sCurrentLine);
            KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sCurrentLine);
            producer.send(message);
        }

storm consumer使用三个螺栓,螺栓一应接收流并将其分为两个不同的流(stream1和stream2)。bolttwo和boltthree应该订阅这些流(简单地说,我希望处理boltone parley中的元组,就像bolt2处理前500行和bollt处理最后500行一样。
拓扑学

builder.setSpout("line-reader-spout",kafkaSpout,1);
        builder.setBolt("bolt-one", new BoltOne(),1).shuffleGrouping("line-reader-spout");
        builder.setBolt("bolt-two", new BoltTwo(),1).shuffleGrouping("bolt-one","stream1");
        builder.setBolt("bolt-three", new BoltThree(),1).shuffleGrouping("bolt-one","stream2");

博尔通

collector.emit("stream1", new Values(input.getString(0)));
            collector.emit("stream2", new Values(input.getString(0)));
        x++;System.out.println("" + x);
        collector.ack(input);

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // TODO Auto-generated method stub
        outputFieldsDeclarer.declareStream("stream1", new Fields("field1"));
        outputFieldsDeclarer.declareStream("stream2", new Fields("field2"));
    }

螺栓2和螺栓3

public void execute(Tuple input) {
        String sentence = input.getString(0);
        System.out.println("*********B2*************");

    }

堆栈跟踪


*********B2*************

1

*********B3*************

2

*********B2*************
*********B3*************

3

*********B3*************
*********B2*************

4

*********B3*************
*********B2*************

5

*********B2*************
*********B3*************

6

*********B2*************
*********B3*************

7

*********B3*************
*********B2*************

完全混淆了分裂流和并行。举个例子会有帮助。
我目前提出的最新解决方案:

public void execute(Tuple input) {
        @SuppressWarnings("unused")
        String sentence = input.getString(0);
        if (x%2==0) {
            collector.emit("stream1", new Values(input.getString(0)));
        }
        else{
            collector.emit("stream2", new Values(input.getString(0)));
        }

        x++;
        collector.ack(input);
    }

我只是在偶数-奇数的基础上划分流,处理时间变为一半,而bolttwo处理一个元组,而boltthree处理另一个元组。

prdp8dxp

prdp8dxp1#

我猜你用 LocalCluster . 由于有多个线程正在运行,因此通过 println(...) 不同步,内部缓冲会扰乱输出顺序。。。因此,您在中看到的东西并不可靠——订单只保存在一个喷口/螺栓中。
此外,你想得到什么样的行为?
现在,你有

Spout => Bolt1 =+=> Bolt2
                +=> Bolt3

即,bolt1的输出被复制,bolt2和bolt3都从bolt1接收所有输出元组。因此,bolt1从1到7计数,bolt1的每个输出元组触发 execute() 螺栓2和螺栓3。
由于bolt2和bolt3做同样的事情,我猜您希望有同一个bolt的两个副本,并将输入划分为两个。为此,只需添加一个螺栓并将“平行度”设置为2:

builder.setSpout("line-reader-spout",kafkaSpout,1);
builder.setBolt("bolt-one", new BoltOne(),1).shuffleGrouping("line-reader-spout");
builder.setBolt("bolt-two", new BoltTwo(),2).shuffleGrouping("bolt-one","stream1");

此外,bolt1只需要声明一个输出流(而不是两个)。如果您声明了多个输出流并对这两个流都进行了写入,那么您将复制数据。。。

相关问题