清除Kafka主题

x6492ojm  于 2021-06-07  发布在  Kafka
关注(0)|答案(19)|浏览(446)

有没有办法清除Kafka的主题?
我将一条太大的消息推送到本地计算机上的Kafka消息主题中,现在出现了一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加 fetch.size 在这里不是很理想,因为我实际上不想接受那么大的信息。

toe95027

toe9502716#

以下命令可用于删除Kafka主题中的所有现有消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

delete.json文件的结构应如下所示:
{“partitions”:[{“topic”:“foo”,“partition”:1,“offset”:-1}],“version”:1}
其中offset:-1将删除所有记录(该命令已通过kafka 2.0.1测试)

htzpubme

htzpubme17#

更新:这个答案与Kafka0.6有关。对于Kafka0.8和更高版本,请参见@patrick的答案。
是的,停止kafka并手动删除相应子目录中的所有文件(在kafka数据目录中很容易找到它)。Kafka重启后,主题将为空。

ergxz8rk

ergxz8rk18#

来自java,使用新的 AdminZkClient 而不是不推荐的 AdminUtils :

public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {

      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }

  private void deleteTopic(String topic, KafkaZkClient zkClient) {

    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }

    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);

    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }

    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }

  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();

    return topics;
  }
wztqucjr

wztqucjr19#

托马斯的建议很好,但很不幸 zkCli 在zookeeper的旧版本中(例如3.3.6)似乎不支持 rmr . 例如,将现代zookeeper中的命令行实现与版本3.3进行比较。
如果您面对的是一个旧版本的zookeeper,那么一个解决方案就是使用一个客户端库,比如zc.zk for python。对于不熟悉python的人,需要使用pip或easy\u安装。然后启动python shell( python )你可以做到:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')

甚至

zk.delete_recursive('brokers')

如果你想删除Kafka的所有主题。

相关问题