结合storm和kafka的词数拓扑

9jyewag0  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(318)

我正在尝试整合单词计数程序的风暴与Kafka,因为我的生产者是工作良好,即它是阅读文本文件和发送每一行作为一个消息,我可以看到这些消息在简单的消费者控制台。现在,为了将它与storm集成,即将这些消息/行发送到consumer spout,我刚刚用storm spout集成依赖项中的kafka spout替换了以前的单词计数程序的storm spout,程序的其余部分是相同的,我正在尝试在eclipse中运行它,但它没有得到执行,我不知道是什么问题,甚至不知道我做的是否正确,这是我的主要课程-

package com.spnotes.storm;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;

import com.spnotes.storm.bolts.WordCounterBolt;
import com.spnotes.storm.bolts.WordSpitterBolt;

public class WordCount {

public static void main(String[] args) throws Exception{
    Config config = new Config();
    config.setDebug(true);
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    BrokerHosts hosts = new ZkHosts("localhost:9092");
    SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "localhost:2181", "id1");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("line-reader-spout", kafkaSpout);

    builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");

    builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-spitter");

    LocalCluster cluster = new LocalCluster();
    System.out.println("submit topology");
    Thread.sleep(10000);
    //StormSubmitter.submitTopology("HelloStorm5", config, builder.createTopology());
    cluster.submitTopology("HelloStorm5", config, builder.createTopology());
    cluster.shutdown();
}

}

有2个螺栓wordsplitterbolt()和wordcounterbolt(),wordsplitterbolt将每行/消息拆分为标记/单词,wordcounterbolt将计算每个单词的数量。有人能告诉我我做错了什么吗?我需要创建自己的喷口,而不是使用预定义的Kafka吗?我的主课对吗?

wixjitnu

wixjitnu1#

变更代码:

BrokerHosts hosts = new ZkHosts(zkConnect);

zkconnect是zookeeper主机名和端口,不适用于kafka。把它改成localhost:2181
正如讨论过的关于chat for rest问题的相关代码。
maven dependency出现问题。需要将所有依赖项包含到pom.xml中。

相关问题