使用java更新kafka中特定主题的ttl

zdwk9cvp  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(452)

更新 TTL 对于一个主题,记录在该主题中保留10天。我必须为一个特定的主题这样做,只留下所有其他主题ttl的相同,当前配置,我必须这样做使用 java 因为我把一个主题 kafka 通过java。我正在设置以下属性以将主题推送到 kafka ```
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_SERVERS);
props.put("acks", ACKS);
props.put("retries", RETRIES);
props.put("linger.ms", new Integer(LINGER_MS));
props.put("buffer.memory", new Integer(BUFFER_MEMORY));
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

gupuwyp2

gupuwyp21#

你可以用 AdminClient ,下面是获取当前配置(仅用于测试)的代码片段,然后更新 retention.ms “在名为的主题上配置” test ".

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

AdminClient adminClient = AdminClient.create(props);

ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test");

// get the current topic configuration
DescribeConfigsResult describeConfigsResult  =
        adminClient.describeConfigs(Collections.singleton(resource));

Map<ConfigResource, Config> config = describeConfigsResult.all().get();

System.out.println(config);

// create a new entry for updating the retention.ms value on the same topic
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "50000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(updateConfig);
alterConfigsResult.all();

describeConfigsResult  = adminClient.describeConfigs(Collections.singleton(resource));

config = describeConfigsResult.all().get();

System.out.println(config);

adminClient.close();

相关问题