Apache风暴三叉戟和Kafka喷口集成

8cdiaqws  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(335)

我找不到好的文档来正确地将Kafka与apache storm trident集成。我试图调查相关的问题,以前张贴在这里,但没有足够的信息。
我想把三叉戟和Kafka连接起来作为一个不透明的三叉戟。下面是当前正在运行的示例代码

GlobalPartitionInformation globalPartitionInformation  = new GlobalPartitionInformation(properties.getProperty("topic", "mytopic"));
Broker brokerForPartition0 = new Broker("IP1",9092);
Broker brokerForPartition1 = new Broker("IP2", 9092);
Broker brokerForPartition2 = new Broker("IP3:9092");

globalPartitionInformation.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
globalPartitionInformation.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
globalPartitionInformation.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts staticHosts = new StaticHosts(globalPartitionInformation);
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);

有了它,我可以为我的拓扑生成流,如下面的代码所示

TridentTopology topology = new TridentTopology();
Stream analyticsStream  = topology.newStream("spout", kafkaSpout).parallelismHint(Integer.valueOf(properties.getProperty("spout","6")))

虽然我已经提供了并行性和分区,但是只有一个kafka喷口的执行器正在运行,因此我无法很好地扩展它。
有谁能告诉我如何更好地集成ApacheStormTrident(2.0.0)和ApacheKafka(1.0),每个集群有3个节点?
而且,一旦它读完Kafka的作品,我就会不断收到这些日志

2018-04-09 14:17:34.119 o.a.s.k.KafkaUtils Thread-15-spout-spout-executor[79 79] [INFO] Metrics Tick: Not enough data to calculate spout lag.  2018-04-09 14:17:34.129 o.a.s.k.KafkaUtils Thread-21-spout-spout-executor[88 88] [INFO] Metrics Tick: Not enough data to calculate spout lag.

在StormUI中,我可以看到上面消息的ACK。有没有建议忽略公制刻度?

jrcvhitl

jrcvhitl1#

如果你是风暴2.0.0无论如何,我认为你应该切换到风暴Kafka客户端三叉戟喷口。storm kafka模块仅用于支持较旧的kafka版本,因为底层的kafka api(simpleconsumer)正在被删除。新模块支持0.10.0.0及以后版本的Kafka。
您可以在这里找到新喷口的三叉戟拓扑示例https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/tridentkafkaclienttopologynamedtopics.java.

相关问题