java—storm中使用2个rabbitmq队列的经典字数拓扑

kwvwclae  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(237)

我必须用java和storm编写一个简单的“单词计数”拓扑。特别是,我有一个生成csv(逗号分隔)字符串的外部数据源
丹尼尔,0.5654144543,用户,899898,评论,,,
这些字符串被插入到名为“input”的rabbitmq队列中。这个数据源工作得很好,我可以看到队列中的字符串。
现在,我修改了经典的拓扑结构,添加了rabbitmqpout。目标是对每个csv行的第一个字段进行字数统计,并将结果发布到名为“output”的新队列中。问题是,我在新队列中看不到任何元组,但拓扑已提交并正在运行。
总结一下:
外部数据源将项放入输入队列
rabbitmqspout从输入队列获取项并将它们插入到拓扑中
已执行经典字数统计拓扑
最后一个螺栓将结果放入输出队列
问题:我可以看到输入队列中的项,但在输出中看不到任何项,即使我使用相同的方法将项发送到外部数据源(并且它可以工作)和rabbitmqexporter(不工作…)中的队列中
下面是一些代码
兔子嘴

public class RabbitMQSpout extends BaseRichSpout {

    public static final String DATA = "data";

    private SpoutOutputCollector _collector;
    private RabbitMQManager rabbitMQManager;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        _collector = _collector;
        rabbitMQManager = new RabbitMQManager("localhost", "rabbitmq", "rabbitmq", "test");
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);

        String data = rabbitMQManager.receive("input");

        if (data != null) {
            _collector.emit(new Values(data));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(DATA));
    }
}

对拉螺栓

public class SplitBolt extends BaseRichBolt {

    private OutputCollector _collector;

    public SplitSentenceBolt() { }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this._collector = collector;
        this.SPACE = Pattern.compile(",");
    }

    @Override
    public void execute(Tuple input) {
        String sentence = input.getStringByField(RabbitMQSpout.DATA);
        String[] words = sentence.split(",");

        if (words.length > 0)
            _collector.emit(new Values(words[0]));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

字计数螺栓

public class WordCountBolt extends BaseBasicBolt {

    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
        count++;
        counts.put(word, count);
        System.out.println(count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

兔子螺栓

public RabbitMQExporterBolt(String rabbitMqHost, String rabbitMqUsername, String rabbitMqPassword,
                                String defaultQueue) {
        super();
        this.rabbitMqHost = rabbitMqHost;
        this.rabbitMqUsername = rabbitMqUsername;
        this.rabbitMqPassword = rabbitMqPassword;
        this.defaultQueue = defaultQueue;
    }

    @Override
    public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

        this.collector=outputCollector;
        this.rabbitmq = new RabbitMQManager(rabbitMqHost, rabbitMqUsername, rabbitMqPassword, defaultQueue);

    }

    @Override
    public void execute(Tuple tuple) {

        String word = tuple.getString(0);
        Integer count = tuple.getInteger(1);

        String output = word + " " + count;
        rabbitmq.send(output);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }

}

拓扑学

public class WordCountTopology {

    private static final String RABBITMQ_HOST = "rabbitmq";
    private static final String RABBITMQ_USER = "rabbitmq";
    private static final String RABBITMQ_PASS = "rabbitmq";
    private static final String RABBITMQ_QUEUE = "output";

    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RabbitMQSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 1)
                .shuffleGrouping("spout");

        builder.setBolt("count", new WordCountBolt(), 1)
                .fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {

            builder.setBolt("exporter",
                    new RabbitMQExporterBolt(
                            RABBITMQ_HOST, RABBITMQ_USER,
                            RABBITMQ_PASS, RABBITMQ_QUEUE ),
                    1)
               .shuffleGrouping("count");

            conf.setNumWorkers(3);

            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

        } else {

            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();

        }
    }

}

兔子经理

public class RabbitMQManager {

    private String host;
    private String username;
    private String password;
    private ConnectionFactory factory;
    private Connection connection;

    private String defaultQueue;

    public RabbitMQManager(String host, String username, String password, String queue) {
        super();
        this.host = host;
        this.username = username;
        this.password = password;

        this.factory = null;
        this.connection = null;
        this.defaultQueue = queue;

        this.initialize();
        this.initializeQueue(defaultQueue);

    }

    private void initializeQueue(String queue){

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(username);
        factory.setPassword(password);
        Connection connection;
        try {
            connection = factory.newConnection();
            Channel channel = connection.createChannel();

            boolean durable = false;
            boolean exclusive = false; 
            boolean autoDelete = false;

            channel.queueDeclare(queue, durable, exclusive, autoDelete, null);

            channel.close();
            connection.close();

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

    }

    private void initialize(){

        factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setUsername(username);
        factory.setPassword(password);
        try {

            connection = factory.newConnection();

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void terminate(){

        if (connection != null && connection.isOpen()){
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private boolean reopenConnectionIfNeeded(){

        try {

            if (connection == null){
                connection = factory.newConnection();
                return true;
            }

            if (!connection.isOpen()){
                connection = factory.newConnection();
            }

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            return false;
        }

        return true;

    }

    public boolean send(String message){
        return this.send(defaultQueue, message);
    }

    public boolean send(String queue, String message){

        try {

            reopenConnectionIfNeeded();
            Channel channel = connection.createChannel();
            channel.basicPublish("", queue, null, message.getBytes());
            channel.close();

            return true;

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

        return false;

    }

    public String receive(String queue) {

        try {
            reopenConnectionIfNeeded();
            Channel channel = connection.createChannel();
            Consumer consumer = new DefaultConsumer(channel);
            return channel.basicConsume(queue, true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题