清除Kafka主题

x6492ojm  于 2021-06-07  发布在  Kafka
关注(0)|答案(19)|浏览(441)

有没有办法清除Kafka的主题?
我将一条太大的消息推送到本地计算机上的Kafka消息主题中,现在出现了一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加 fetch.size 在这里不是很理想,因为我实际上不想接受那么大的信息。

6ioyuze2

6ioyuze21#

这里有很多很好的答案,但在其中,我没有找到一个关于docker的。我花了一些时间来找出使用代理容器在这种情况下是错误的(显然!!!)


## this is wrong!

docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

我应该用 zookeeper:2181 而不是 --zookeeper localhost:2181 根据我的文件


## this might be an option, but as per comment below not all zookeeper images can have this script included

docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

正确的命令是

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

希望能节省别人的时间。
另外,请注意,这些消息不会立即被删除,而且会在关闭日志段时发生。

mf98qq94

mf98qq942#

kafka没有清除/清理主题(队列)的直接方法,但是可以通过删除该主题并重新创建它来实现。
首先,确保sever.properties文件具有,如果没有,则添加 delete.topic.enable=true 然后,删除主题 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic 然后再创建一次。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
liwlm1x9

liwlm1x93#

在kafka 0.8.2中测试,对于快速入门示例:首先,在config文件夹下的server.properties文件中添加一行:

delete.topic.enable=true

然后,可以运行以下命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
vcirk6k6

vcirk6k64#

以下是删除名为 MyTopic :
描述主题,不要考虑代理ID
为列出的每个代理id停止apachekafka守护程序。
连接到每个代理,并删除主题数据文件夹,例如。 rm -rf /tmp/kafka-logs/MyTopic-0 . 对其他分区和所有副本重复此操作
删除主题元数据: zkCli.sh 那么 rmr /brokers/MyTopic 为每个停止的机器启动apachekafka守护程序
如果您错过了步骤3,那么apachekafka将继续报告当前主题(例如,如果您运行 kafka-list-topic.sh ).
使用ApacheKafka0.8.0进行测试。

rur96b6h

rur96b6h5#

最简单的方法是将各个日志文件的日期设置为早于保留期。然后经纪人应该清理它们,并在几秒钟内为您移除它们。这有几个优点:
不需要关闭代理,这是一个运行时操作。
避免出现无效偏移异常的可能性(下面将详细介绍)。
根据我使用kafka0.7.x的经验,删除日志文件并重新启动代理可能会导致某些使用者出现无效的偏移异常。这是因为代理在零处重新启动偏移量(在没有任何现有日志文件的情况下),而以前从主题中消费的使用者将重新连接以请求特定的[一旦有效]偏移量。如果这个偏移量恰好落在新主题日志的边界之外,那么就不会有任何损害,并且使用者会在开始或结束时恢复。但是,如果偏移量落在新主题日志的范围内,代理将尝试获取消息集,但失败,因为偏移量没有与实际消息对齐。
这可以通过清除zookeeper中该主题的消费者补偿来缓解。但是,如果您不需要一个原始主题,只想删除现有的内容,那么简单地“触摸”几个主题日志要比停止代理、删除主题日志和清除某些zookeeper节点容易得多,也更可靠。

q3qa4bjr

q3qa4bjr6#

要清除队列,可以删除主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创建:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test
tzdcorbm

tzdcorbm7#

另一种手动清除主题的方法是:
在经纪人中:
阻止Kafka的经纪人 sudo service kafka stop 删除所有分区日志文件(应在所有代理上执行) sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-* Zookeeper:
运行zookeeper命令行界面 sudo /usr/lib/zookeeper/bin/zkCli.sh 使用zkcli删除主题元数据 rmr /brokers/topic/<some_topic_name> 在经纪人中:
重新启动代理服务 sudo service kafka start

b1uwtaje

b1uwtaje8#

虽然公认的答案是正确的,但该方法已被弃用。主题配置现在应该通过 kafka-configs .

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

通过此方法设置的配置可以通过命令显示

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
nafvub8i

nafvub8i9#

无法添加为注解,因为大小:不确定这是否为真,除了更新retention.ms和retention.bytes之外,但我注意到主题清理策略应该是“delete”(默认值),如果是“compact”,它将保留更长的邮件,即,如果是“compact”,您还必须指定delete.retention.ms。 ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1 还得监控最早/最新的偏移量应相同,以确认这是否成功发生,还可以查看du-h/tmp/kafka日志/test-topic-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762 ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762 另一个问题是,您必须首先获取当前配置,以便记住在删除成功后还原: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

3wabscal

3wabscal10#

有时,如果您有一个饱和的集群(太多的分区,或者使用加密的主题数据,或者使用ssl,或者控制器在一个坏节点上,或者连接是脆弱的),那么清除该主题将需要很长时间。
我遵循这些步骤,尤其是在使用avro时。
1:使用Kafka工具运行:

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2:在架构注册表节点上运行: kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning 3:一旦主题为空,将主题保留设置回原始设置。

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

希望这能帮助别人,因为它不容易宣传。

wlsrxk51

wlsrxk5111#

使用应用程序组(groupname应与application kafka group name相同)清除特定主题中的所有消息。 ./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

qyzbxkaa

qyzbxkaa12#

来自Kafka1.1
清除主题

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

等待1分钟,以确保Kafka清除主题删除配置,然后转到默认值

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
vs3odd8k

vs3odd8k13#

将主题的保留时间临时更新为1秒:

kafka-topics.sh --zookeeper <zkhost>:2181 --alter --topic <topic name> --config retention.ms=1000

在较新的Kafka版本中,你也可以用 kafka-configs --entity-type topics ```
kafka-configs.sh --zookeeper :2181 --entity-type topics --alter --entity-name --add-config retention.ms=1000

然后等待清洗生效(大约一分钟)。清除后,恢复以前的 `retention.ms` 价值观。
vdgimpew

vdgimpew14#

在@steven appleyard answer之后,我在kafka 2.2.0上执行了以下命令,它们为我工作。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
hc2pp10m

hc2pp10m15#

./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

这应该给 retention.ms 已配置。然后您可以使用上面的alter命令更改为1秒(稍后恢复为默认值)。

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000

相关问题