我必须用java和storm编写一个简单的“单词计数”拓扑。特别是,我有一个生成csv(逗号分隔)字符串的外部数据源
丹尼尔,0.5654144543,用户,899898,评论,,,
这些字符串被插入到名为“input”的rabbitmq队列中。这个数据源工作得很好,我可以看到队列中的字符串。
现在,我修改了经典的拓扑结构,添加了rabbitmqpout。目标是对每个csv行的第一个字段进行字数统计,并将结果发布到名为“output”的新队列中。问题是,我在新队列中看不到任何元组,但拓扑已提交并正在运行。
总结一下:
外部数据源将项放入输入队列
rabbitmqspout从输入队列获取项并将它们插入到拓扑中
已执行经典字数统计拓扑
最后一个螺栓将结果放入输出队列
问题:我可以看到输入队列中的项,但在输出中看不到任何项,即使我使用相同的方法将项发送到外部数据源(并且它可以工作)和rabbitmqexporter(不工作…)中的队列中
下面是一些代码
兔子嘴
public class RabbitMQSpout extends BaseRichSpout {
public static final String DATA = "data";
private SpoutOutputCollector _collector;
private RabbitMQManager rabbitMQManager;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
_collector = _collector;
rabbitMQManager = new RabbitMQManager("localhost", "rabbitmq", "rabbitmq", "test");
}
@Override
public void nextTuple() {
Utils.sleep(1000);
String data = rabbitMQManager.receive("input");
if (data != null) {
_collector.emit(new Values(data));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(DATA));
}
}
对拉螺栓
public class SplitBolt extends BaseRichBolt {
private OutputCollector _collector;
public SplitSentenceBolt() { }
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this._collector = collector;
this.SPACE = Pattern.compile(",");
}
@Override
public void execute(Tuple input) {
String sentence = input.getStringByField(RabbitMQSpout.DATA);
String[] words = sentence.split(",");
if (words.length > 0)
_collector.emit(new Values(words[0]));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
字计数螺栓
public class WordCountBolt extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
System.out.println(count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
兔子螺栓
public RabbitMQExporterBolt(String rabbitMqHost, String rabbitMqUsername, String rabbitMqPassword,
String defaultQueue) {
super();
this.rabbitMqHost = rabbitMqHost;
this.rabbitMqUsername = rabbitMqUsername;
this.rabbitMqPassword = rabbitMqPassword;
this.defaultQueue = defaultQueue;
}
@Override
public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector=outputCollector;
this.rabbitmq = new RabbitMQManager(rabbitMqHost, rabbitMqUsername, rabbitMqPassword, defaultQueue);
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer count = tuple.getInteger(1);
String output = word + " " + count;
rabbitmq.send(output);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
拓扑学
public class WordCountTopology {
private static final String RABBITMQ_HOST = "rabbitmq";
private static final String RABBITMQ_USER = "rabbitmq";
private static final String RABBITMQ_PASS = "rabbitmq";
private static final String RABBITMQ_QUEUE = "output";
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RabbitMQSpout(), 1);
builder.setBolt("split", new SplitSentenceBolt(), 1)
.shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 1)
.fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
builder.setBolt("exporter",
new RabbitMQExporterBolt(
RABBITMQ_HOST, RABBITMQ_USER,
RABBITMQ_PASS, RABBITMQ_QUEUE ),
1)
.shuffleGrouping("count");
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
兔子经理
public class RabbitMQManager {
private String host;
private String username;
private String password;
private ConnectionFactory factory;
private Connection connection;
private String defaultQueue;
public RabbitMQManager(String host, String username, String password, String queue) {
super();
this.host = host;
this.username = username;
this.password = password;
this.factory = null;
this.connection = null;
this.defaultQueue = queue;
this.initialize();
this.initializeQueue(defaultQueue);
}
private void initializeQueue(String queue){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
Connection connection;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
boolean durable = false;
boolean exclusive = false;
boolean autoDelete = false;
channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
channel.close();
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
private void initialize(){
factory = new ConnectionFactory();
factory.setHost(host);
factory.setUsername(username);
factory.setPassword(password);
try {
connection = factory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
public void terminate(){
if (connection != null && connection.isOpen()){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private boolean reopenConnectionIfNeeded(){
try {
if (connection == null){
connection = factory.newConnection();
return true;
}
if (!connection.isOpen()){
connection = factory.newConnection();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
return false;
}
return true;
}
public boolean send(String message){
return this.send(defaultQueue, message);
}
public boolean send(String queue, String message){
try {
reopenConnectionIfNeeded();
Channel channel = connection.createChannel();
channel.basicPublish("", queue, null, message.getBytes());
channel.close();
return true;
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
return false;
}
public String receive(String queue) {
try {
reopenConnectionIfNeeded();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel);
return channel.basicConsume(queue, true, consumer);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!