喜欢风暴Kafka客户端,我用风暴Kafka客户端,但不能很好地工作,写一个新的喷口不工作了。谁能帮我写一个Kafka的小插曲。
tpgth1q71#
定义拓扑.java
import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class Topology{ public static void main(String[] args){ TopologyBuilder builder = new TopologyBuilder(); String zkHosts = StringUtils.join("127.0.0.1", ','); BrokerHosts hosts = new ZkHosts(zkHosts); SpoutConfig spoutConfig = new SpoutConfig(hosts, "kafkaTopic_name", "/kafkaTopic_name", "kafkaGroup_name"); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConfig.forceFromStart = forceFromStart; builder.setSpout("events", new KafkaSpout(spoutConfig), 5).setNumTasks(5); //... } }
基本上,为了创建kafkaspout,您需要创建spoutconfig。
1条答案
按热度按时间tpgth1q71#
定义拓扑.java
基本上,为了创建kafkaspout,您需要创建spoutconfig。