如何使用新的kafka消费api编写samplest kafka喷口?

9vw9lbht  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(243)

喜欢风暴Kafka客户端,我用风暴Kafka客户端,但不能很好地工作,写一个新的喷口不工作了。谁能帮我写一个Kafka的小插曲。

tpgth1q7

tpgth1q71#

定义拓扑.java

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class Topology{
  public static void main(String[] args){
     TopologyBuilder builder = new TopologyBuilder();
     String zkHosts = StringUtils.join("127.0.0.1", ',');

        BrokerHosts hosts = new ZkHosts(zkHosts);
        SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name");
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.forceFromStart = forceFromStart;
        builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5);
        //...
  }
}

基本上,为了创建kafkaspout,您需要创建spoutconfig。

相关问题