我们有一个拓扑,有多个Kafka喷口任务。每个喷口任务都应该从一组Kafka主题中读取消息的子集。必须使用诸如aaa.bbb.*之类的通配符订阅主题。预期的行为是,所有spout任务将集体使用与通配符匹配的所有主题中的所有消息。每条消息只路由到一个喷口任务(忽略失败场景)。目前是否支持此功能?
mzsu5hc01#
也许您可以使用dynamicbrokersreader类。
Map conf = new HashMap(); ... conf.put("kafka.topic.wildcard.match", true); wildCardBrokerReader = new DynamicBrokersReader(conf, connectionString, masterPath, "AAA.BBB.*"); List<GlobalPartitionInformation> partitions = wildCardBrokerReader.getBrokerInfo(); ... for (GlobalPartitionInformation eachTopic: partitions) { StaticHosts hosts = new StaticHosts(eachTopic); SpoutConfig spoutConfig = new SpoutConfig(hosts, eachTopic.topic, zkRoot, id); KafkaSpout spout = new KafkaSpout(spoutConfig); } ... // Wrap those created spout instances into a container
1条答案
按热度按时间mzsu5hc01#
也许您可以使用dynamicbrokersreader类。