试图通过风暴0.9.4从redis获取数据。
redis中存储的数据:
users:temp 10,20,30,40,50
拓扑代码:
public class StormRedisTopology {
public static String host,pattern;
public static Integer port;
public static void main(String[] args) {
String host = "192.168.161.226";
int port = 6379;
String pattern = "users:temp";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("redisspout", new StormRedisSpout(host,port,pattern));
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
雨水口:
public class StormRedisSpout extends BaseRichSpout {
private transient boolean spoutActive = true;
private SpoutOutputCollector _collector;
final String host,pattern;
final int port;
LinkedBlockingQueue<String> queue;
JedisPool pool;
public StormRedisSpout(String host, int port, String pattern) {
this.host = host;
this.port = port;
this.pattern = pattern;
}
class ListenerThread extends Thread{
LinkedBlockingQueue<String> queue;
JedisPool pool;
String pattern;
public ListenerThread(LinkedBlockingQueue<String> queue, JedisPool pool, String pattern) {
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
}
public void run(){
log.info("Inside run method of redis spout");
JedisPubSub listener = new JedisPubSub() {
public void onMessage(String channel, String message) {
queue.offer(message);
}
public void onPMessage(String pattern, String channel, String message) {
queue.offer(message);
}
public void onPSubscribe(String channel, int subscribedChannels) {
}
public void onPUnsubscribe(String channel, int subscribedChannels) {
}
public void onSubscribe(String channel, int subscribedChannels) {
}
public void onUnsubscribe(String channel, int subscribedChannels) {
}
};
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
}
try {
jedis.psubscribe(listener, pattern);
} finally {
pool.returnResource(jedis);
}
}
};
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
log.info("Inside open method of redis spout");
_collector = collector;
queue = new LinkedBlockingQueue<String>(1000);
pool = new JedisPool(host,port);
ListenerThread listener = new ListenerThread(queue,pool,pattern);
listener.start();
}
public void close() {
pool.destroy();
}
public void activate() {
spoutActive = true;
}
public void deactivate() {
spoutActive = false;
}
public void nextTuple() {
log.info("Inside nextTuple method of redis spout");
String ret = queue.poll();
if(ret!=null) {
_collector.emit(tuple(ret));
} else {
Utils.sleep(50);
}
}
private List<Object> tuple(String ret) {
return null;
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
public boolean isDistributed() {
return false;
}
}
让我知道我做错了什么
暂无答案!
目前还没有任何答案,快来回答吧!