convert-storm-wordcount拓扑使用kafka喷口

lvjbypge  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(356)

我刚接触storm和kafka,一段时间后我可以在本地虚拟机上安装这两个。我目前有一个工作的wordcount拓扑从dropbox文本文件中提取句子:

public void nextTuple() {

 final String APP_KEY = "XXXX";
final String APP_SECRET = "XXXX";
DbxAppInfo appInfo = new DbxAppInfo(APP_KEY, APP_SECRET);
DbxRequestConfig config = new DbxRequestConfig("StormTopology/1.0", Locale.getDefault().toString());
String accessToken = "XXXXXXXX";
DbxClient client = new DbxClient(config, accessToken);
String sentence="";
try {FileOutputStream outputStream = new FileOutputStream("fromSpout.txt"); 
try {
    //client.delete("/*.txt");
   DbxEntry.File downloadedFile = client.getFile("/spout.txt", null,outputStream);

   sentence= readFile("fromSpout.txt");          
   if (sentence==null || sentence == "" || sentence == " " || sentence == "/t") {
           Utils.sleep(1000);
           return;
           }                    
        } 
catch (DbxException ex) {  } 
catch (IOException ex) { }       
        //return 1;
finally {
      outputStream.close();
         }
    }
catch (FileNotFoundException ex){}
catch (IOException ex) {}       

if (sentence.length()<2) {  Utils.sleep(10000);  return; }
try { client.delete("/spout.txt");}
 catch (DbxException ex) {  } 
_collector.emit(new Values(sentence)); 
Utils.sleep(1000);

现在我想升级我的喷口使用文本Kafka,以便提交给我的下一个螺栓在拓扑结构。我试图在git中阅读许多文章和代码,但都没有成功。例如:这个Kafka喷口。有谁能帮我实现这个新的spout.java文件吗?谢谢您!

juud5qan

juud5qan1#

从Storm0.9.2版本开始,有一个外部StormKafka包可以做到这一点。实际上,这个包是从storm-kafka-0.8-plus提供给storm社区的。还有一个测试项目展示了它的用法。
具体来说,首先向maven(或lein/gradle)添加依赖项:

groupId: org.apache.storm
artifactId: storm-kafka
version: 0.9.2-incubating

然后创建拓扑并像这样喷射:

import storm.kafka

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

相关问题