如果问题解决了,我很抱歉,但是我试着找到它,但是没有成功。有一些类似的,但我没有找到帮助,我看到的地方。我有下一个问题:
603 [main] WARN b.s.StormSubmitter - Topology submission exception:
Component: [escribirFichero] subscribes from non-existent stream:
[default] of component [buscamosEnKlout]
Exception in thread "main" java.lang.RuntimeException:
InvalidTopologyException(msg:Component:
[escribirFichero] subscribes from non-existent stream:
[default] of component [buscamosEnKlout])
我不明白为什么我有这个例外。在使用“escribirfichero”之前,我将螺栓声明为“buscamosenklout”。在我的拓扑结构旁边,我将放置螺栓的基本线。我知道喷口是可以的,因为一个尝试和错误的方法。
我的拓扑代码是:
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.stats.RollingWindow;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import bolt.*;
import spout.TwitterSpout;
import twitter4j.FilterQuery;
public class TwitterTopologia {
private static String consumerKey = "xxx1";
private static String consumerSecret = "xxx2";
private static String accessToken = "yyy1";
private static String accessTokenSecret="yyy2";
public static void main(String[] args) throws Exception {
/****************SETUP****************/
String remoteClusterTopologyName = null;
if (args!=null) { ... }
TopologyBuilder builder = new TopologyBuilder();
FilterQuery tweetFilterQuery = new FilterQuery();
tweetFilterQuery.track(new String[]{"Vacaciones","Holy Week", "Semana Santa","Holidays","Vacation"});
tweetFilterQuery.language(new String[]{"en","es"});
TwitterSpout spout = new TwitterSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, tweetFilterQuery);
KloutBuscador buscamosEnKlout = new KloutBuscador();
FileWriterBolt fileWriterBolt = new FileWriterBolt("idUsuarios.txt");
builder.setSpout("spoutLeerTwitter",spout,1);
builder.setBolt("buscamosEnKlout",buscamosEnKlout,1).shuffleGrouping("spoutLeerTwitter");
builder.setBolt("escribirFichero",fileWriterBolt,1).shuffleGrouping("buscamosEnKlout");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("twitter-fun", conf, builder.createTopology());
Thread.sleep(460000);
cluster.shutdown();
}
}
}
下一个代码是bolt“klouthuscador”,别名“buscamosenklout”:
String text = tuple.getStringByField("id");
String cadenaUrl;
cadenaUrl = "http://api.klout.com/v2/identity.json/twitter?screenName=";
cadenaUrl += text.replaceAll("\\[", "").replaceAll("\\]","");
cadenaUrl += "&key=" + kloutKey;
URL url = new URL(cadenaUrl);
HttpURLConnection c = (HttpURLConnection) url.openConnection();
...........c.setRequestMethod("GET");c.setRequestProperty("Content-length", "0");c.setUseCaches(false);c.setAllowUserInteraction(false);c.connect();
int status = c.getResponseCode();
StringBuilder sb = new StringBuilder();
switch (status) {
case 200:
case 201:
BufferedReader br = new BufferedReader(new InputStreamReader(c.getInputStream()));
String line;
while ((line = br.readLine()) != null) sb.append(line + "\n");
br.close();
}
JSONObject jsonResponse = new JSONObject(sb.toString());
//getJSONArray("id");
String results = jsonResponse.toString();
_collector.emit(new Values(text,results));
第二个螺栓,filewriterbolt,别名“escribirfichero”,是下一个:
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
_collector = outputCollector;
try {
writer = new PrintWriter(filename, "UTF-8");...}...}
public void execute(Tuple tuple) {
writer.println((count++)+":::"+tuple.getValues());
//+"+++"+tweet.getUser().getId()+"__FINAL__"+tweet.getUser().getName()
writer.flush();
// Confirm that this tuple has been treated.
//_collector.ack(tuple);
}
如果我越过克卢斯的螺栓,只写喷口的结果,它就工作了。我不明白为什么克洛斯的螺栓会导致这次失败
1条答案
按热度按时间jgzswidk1#
buscamosenklout螺栓需要声明它将发出的元组的格式,以及它将向哪些流发出。您很可能没有在该bolt中正确实现declareoutputfields。它应该包含
declarer.declare(new Fields("your-text-field", "your-results-field"))