我正在windows上使用confluent 3.0.1平台。我遵循安装指南和开发人员指南来完成所有的安装和开发我的拓扑结构。
我启动了zookeeper,然后是kafka服务器,并尝试运行我的拓扑结构。但是在Kafka服务器上得到下面的错误。即使我手动创建主题并运行拓扑,我也会看到相同的错误。
INFO Topic creation {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$)
[2016-09-21 17:20:08,807] INFO [KafkaApi-0] Auto creation of topic Text4 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2016-09-21 17:20:09,436] ERROR [KafkaApi-0] Error when handling request {group_id=my-first-streams-application1} (kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651)
at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
at scala.Option.getOrElse(Option.scala:121)
at kafka.server.KafkaApis.getOrCreateGroupMetadataTopic(KafkaApis.scala:657)
at kafka.server.KafkaApis.handleGroupCoordinatorRequest(KafkaApis.scala:818)
at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
我的拓扑代码如下:
public class MetricTopology implements InitializingBean {
@Autowired
@Qualifier("getStreamsConfig")
private Properties properties;
// Method to build topology.
public void buildTopology() {
System.out.println("MetricTopology.buildTopology()");
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "Text4" as input
builder.addSource("Source", "Text4")
// add the Metricsprocessor node which takes the source processor as its upstream processor
.addProcessor("Process", () -> new MetricsProcessor(), "Source");
// Building Stream.
KafkaStreams streams = new KafkaStreams(builder, properties);
streams.start();
}
// Called after all properties are set.
public void afterPropertiesSet() throws Exception {
buildTopology();
}
}
下面是我正在使用的属性,它们是不同java源文件的一部分。
Properties settings = new Properties();
// Set a few key parameters. This properties will be picked from property file.
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application1");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "1");
1条答案
按热度按时间lqfhib0f1#
为了创建复制因子为3的主题,您至少需要3个正在运行的代理。