apache storm(本地)未连接到apache kafka(本地)

vltsax25  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(344)

我正在开发一个poc,它将读取来自Kafka的消息,并通过风暴实时处理它。我在当地开了个Zookeeper和Kafka。我创建了一个主题(名为 test ),生产者和消费者,它们在命令提示符下工作正常。现在我想用storm阅读主题中的信息。当我试着运行下面的代码时,风暴喷口没有连接到Kafka/Zookeeper。这在日志中很明显,因为在任何地方都没有提到localhost或2181。这个过程失败了
6939[thread-15-eventsemitter-executor[2]]info o.a.s.k.partitionmanager-从:/test/storm/partition\u 0-->null读取分区信息

public class TestTopology {

    public static void main(String[] args) {

        BrokerHosts zkHosts = new ZkHosts("localhost:2181");
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "test", "/test", "storm");
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("eventsEmitter", kafkaSpout, 1);
        builder.setBolt("eventsProcessor", new WordCountBolt(), 1).shuffleGrouping("eventsEmitter");
        Config config = new Config();
        config.setMaxTaskParallelism(5);
        /*
         * config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);
         * 
         * config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
         * config.put(Config.STORM_ZOOKEEPER_SERVERS,
         * Arrays.asList("localhost"));
         */

        try {
            ILocalCluster cls = new LocalCluster();         
            cls.submitTopology("my-topology", config, builder.createTopology());
        } catch (Exception e) {
            throw new IllegalStateException("Couldn't initialize the topology",
                    e);
        }
    }

}

它连接的是它创建的本地zookeeper,而不是运行kafka的zookeeper

4632 [Thread-11] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4632 [Thread-11] INFO  o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@acd1da
4633 [Thread-11-SendThread(127.0.0.1:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
4634 [Thread-11-SendThread(127.0.0.1:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2000, initiating session
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:62287
4634 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:62287
4635 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154d458c4130011 with negotiated timeout 20000 for client /127.0.0.1:62287
4635 [Thread-11-SendThread(127.0.0.1:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2000, sessionid = 0x154d458c4130011, negotiated timeout = 20000
4635 [Thread-11-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED

如果你需要更多的信息,请告诉我。

gg0vcinb

gg0vcinb1#

您必须配置 config 以便它知道kafka服务器端口,例如:

Properties props = new Properties();
//default broker port = 9092
props.put("metadata.broker.list", "localhost:" + BROKER_PORT); 
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");

Config config = new Config();        
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
config.setDebug(true);
config.setMaxTaskParallelism(5);
x4shl7ld

x4shl7ld2#

经过一个紧张的夜晚,我想出了解决这个问题的办法。实际上问题不在代码上,而在jar上。我附上了所有3个软件包的log4j jar,即zookeeper、kafka和storm,但代码只需要一个。这在我的Eclipse中显示为红色警告,我之前忽略了它。当我删除不必要的log4js时,Kafka的喷口开始阅读我创建的Kafka主题。感谢大家花时间研究这个问题@马蒂亚斯我想既然我把它和Zookeeper联系起来了,它就和Kafka所管理的任何东西联系起来了。因此,至少在地方一级提到这一点可能是不必要的,但无论如何还是要感谢。。

相关问题