redis-storm集成在redis-spout中出现异常

xcitsw88  于 2021-06-21  发布在  Storm
关注(0)|答案(0)|浏览(207)

试图通过风暴0.9.4从redis获取数据。
redis中存储的数据:
users:temp 10,20,30,40,50
拓扑代码:

public class StormRedisTopology {

    public static String host,pattern;

    public static Integer port;

    public static void main(String[] args) {

        String host = "192.168.161.226";

        int port = 6379;

        String pattern = "users:temp";

        TopologyBuilder builder = new TopologyBuilder(); 

        builder.setSpout("redisspout", new StormRedisSpout(host,port,pattern));

        Config conf = new Config();

        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("test", conf, builder.createTopology());

        Utils.sleep(10000);

        cluster.killTopology("test");

        cluster.shutdown();

    }

}

雨水口:

public class StormRedisSpout extends BaseRichSpout {

    private transient boolean spoutActive = true;

    private SpoutOutputCollector _collector;

    final String host,pattern;

    final int port;

    LinkedBlockingQueue<String> queue;

    JedisPool pool;

    public StormRedisSpout(String host, int port, String pattern) {

        this.host = host;

        this.port = port;

        this.pattern = pattern;

    }

    class ListenerThread extends Thread{

        LinkedBlockingQueue<String> queue;

        JedisPool pool;

        String pattern; 

        public ListenerThread(LinkedBlockingQueue<String> queue, JedisPool pool, String pattern) {

            this.queue = queue;

            this.pool = pool;

            this.pattern = pattern;

        }

        public void run(){

            log.info("Inside run method of redis spout");
            JedisPubSub listener = new JedisPubSub() {

                public void onMessage(String channel, String message) {
                    queue.offer(message);
                }

                public void onPMessage(String pattern, String channel, String message) {
                    queue.offer(message);
                }

                public void onPSubscribe(String channel, int subscribedChannels) {

                }

                public void onPUnsubscribe(String channel, int subscribedChannels) {

                }

                public void onSubscribe(String channel, int subscribedChannels) {

                }

                public void onUnsubscribe(String channel, int subscribedChannels) {

                }
            };          
            Jedis jedis = null;
                    try {
                        jedis = pool.getResource();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
            try {
                jedis.psubscribe(listener, pattern);
            } finally {
                pool.returnResource(jedis);
            }
        }
    };  
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        log.info("Inside open method of redis spout");
        _collector = collector;
        queue = new LinkedBlockingQueue<String>(1000);
        pool = new JedisPool(host,port);

        ListenerThread listener = new ListenerThread(queue,pool,pattern);
        listener.start();

    }
    public void close() {
        pool.destroy();
    }
      public void activate() {
            spoutActive = true;
        }
        public void deactivate() {
            spoutActive = false;
        }   
    public void nextTuple() {
        log.info("Inside nextTuple method of redis spout");
        String ret = queue.poll();
        if(ret!=null) {
          _collector.emit(tuple(ret)); 
        } else {

            Utils.sleep(50);

        }
    }

    private List<Object> tuple(String ret) {
        return null;
    }
    public void ack(Object msgId) {
    }

    public void fail(Object msgId) {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
    public boolean isDistributed() {
        return false;
    }
}

让我知道我做错了什么

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题