Kafka溪流:自动创建源主题

epggiuax  于 2023-03-22  发布在  Apache
关注(0)|答案(1)|浏览(135)

我目前正在使用Kafka Streams,它需要将消息推送到入站主题(源)中,在入站主题中做一些处理,然后将处理后的消息推送到出站主题。我的java App正在使用出站主题中的消息。当我第一次运行该应用时,我看到以下错误消息:

The following source topics are missing/unknown: [local.inbound.topic]. Please make sure all source topics have been pre-created before starting the Streams application.

我研究了一下,发现对于Kafka Streams,入站主题需要提前创建。一种选择是在部署我的应用程序之前让管理员创建主题,以便Kafka Streams准备好从入站主题读取。另一种是以编程方式创建此主题。我想知道是否有一种方法可以通过Java实现这一点(在Streams runner启动之前自动创建入站主题)?而且,这样做是明智的吗?谢谢。

zxlwwiss

zxlwwiss1#

是的,你可以使用Kafka的admin API创建主题,如下所示:

Properties properties = new Properties();
properties.put(
  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()
);

Admin admin = Admin.create(properties)

int partitions = 1;
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);

CreateTopicsOptions topicOptions = new CreateTopicsOptions()
  .validateOnly(true)
  .retryOnQuotaViolation(false);

CreateTopicsResult result = admin.createTopics(
  Collections.singleton(newTopic), topicOptions
);

相关问题