有没有办法清除Kafka的主题?我将一条太大的消息推送到本地计算机上的Kafka消息主题中,现在出现了一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加 fetch.size 在这里不是很理想,因为我实际上不想接受那么大的信息。
fetch.size
toe9502716#
以下命令可用于删除Kafka主题中的所有现有消息:
kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json
delete.json文件的结构应如下所示:{“partitions”:[{“topic”:“foo”,“partition”:1,“offset”:-1}],“version”:1}其中offset:-1将删除所有记录(该命令已通过kafka 2.0.1测试)
htzpubme17#
更新:这个答案与Kafka0.6有关。对于Kafka0.8和更高版本,请参见@patrick的答案。是的,停止kafka并手动删除相应子目录中的所有文件(在kafka数据目录中很容易找到它)。Kafka重启后,主题将为空。
ergxz8rk18#
来自java,使用新的 AdminZkClient 而不是不推荐的 AdminUtils :
AdminZkClient
AdminUtils
public void reset() { try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000, 5000, 10, Time.SYSTEM, "metricGroup", "metricType")) { for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) { deleteTopic(entry.getKey(), zkClient); } } } private void deleteTopic(String topic, KafkaZkClient zkClient) { // skip Kafka internal topic if (topic.startsWith("__")) { return; } System.out.println("Resetting Topic: " + topic); AdminZkClient adminZkClient = new AdminZkClient(zkClient); adminZkClient.deleteTopic(topic); // deletions are not instantaneous boolean success = false; int maxMs = 5_000; while (maxMs > 0 && !success) { try { maxMs -= 100; adminZkClient.createTopic(topic, 1, 1, new Properties(), null); success = true; } catch (TopicExistsException ignored) { } } if (!success) { Assert.fail("failed to create " + topic); } } private Map<String, List<PartitionInfo>> listTopics() { Properties props = new Properties(); props.put("bootstrap.servers", kafkaContainer.getBootstrapServers()); props.put("group.id", "test-container-consumer-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); Map<String, List<PartitionInfo>> topics = consumer.listTopics(); consumer.close(); return topics; }
wztqucjr19#
托马斯的建议很好,但很不幸 zkCli 在zookeeper的旧版本中(例如3.3.6)似乎不支持 rmr . 例如,将现代zookeeper中的命令行实现与版本3.3进行比较。如果您面对的是一个旧版本的zookeeper,那么一个解决方案就是使用一个客户端库,比如zc.zk for python。对于不熟悉python的人,需要使用pip或easy\u安装。然后启动python shell( python )你可以做到:
zkCli
rmr
python
import zc.zk zk = zc.zk.ZooKeeper('localhost:2181') zk.delete_recursive('brokers/MyTopic')
甚至
zk.delete_recursive('brokers')
如果你想删除Kafka的所有主题。
19条答案
按热度按时间toe9502716#
以下命令可用于删除Kafka主题中的所有现有消息:
delete.json文件的结构应如下所示:
{“partitions”:[{“topic”:“foo”,“partition”:1,“offset”:-1}],“version”:1}
其中offset:-1将删除所有记录(该命令已通过kafka 2.0.1测试)
htzpubme17#
更新:这个答案与Kafka0.6有关。对于Kafka0.8和更高版本,请参见@patrick的答案。
是的,停止kafka并手动删除相应子目录中的所有文件(在kafka数据目录中很容易找到它)。Kafka重启后,主题将为空。
ergxz8rk18#
来自java,使用新的
AdminZkClient
而不是不推荐的AdminUtils
:wztqucjr19#
托马斯的建议很好,但很不幸
zkCli
在zookeeper的旧版本中(例如3.3.6)似乎不支持rmr
. 例如,将现代zookeeper中的命令行实现与版本3.3进行比较。如果您面对的是一个旧版本的zookeeper,那么一个解决方案就是使用一个客户端库,比如zc.zk for python。对于不熟悉python的人,需要使用pip或easy\u安装。然后启动python shell(
python
)你可以做到:甚至
如果你想删除Kafka的所有主题。