当我在IntellijIdea或Eclipse上启动一个简单的Strorm脚本时,我遇到了这个问题:“无法规范化地址localhost/:2000,因为它不可解析”。
import org.apache.storm.*;
import org.apache.storm.generated.*;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.*;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class ExclamationExample {
public static class TestWordSpout extends BaseRichSpout {
private SpoutOutputCollector _collector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
_collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[]{"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
System.out.printf("Word spout: %s\n", word);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
public static class ExclamationBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String val = tuple.getString(0) + "!!!";
collector.emit(tuple, new Values(val));
System.out.printf("Exclamation bolt: %s\n", val);
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
//TOPOLOGY
public static void main(String[] args) throws Exception {
Config conf = new Config();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 2);
builder.setBolt("exclaim1", new ExclamationBolt(), 2).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
conf.setDebug(false);
String topologyName = "test";
conf.setNumWorkers(2);
//If there are arguments, we are running on a cluster
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
topologyName = args[0];
StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
}
try (LocalCluster cluster = new LocalCluster()) {
cluster.submitTopology(topologyName, conf, builder.createTopology());
Utils.sleep(600000); // wait [param] ms
cluster.killTopology(topologyName);
cluster.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
可能的解决方案是什么?
1条答案
按热度按时间7y4bm7vi1#
我解决了我的问题。如果可以帮助别人我告诉解决方案。我运行的Java应用程序使用Java SDK 17,这给予我的问题,然后我创建了新的项目与JDK 1. 8和一切工作!