## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
19条答案
按热度按时间6ioyuze21#
这里有很多很好的答案,但在其中,我没有找到一个关于docker的。我花了一些时间来找出使用代理容器在这种情况下是错误的(显然!!!)
我应该用
zookeeper:2181
而不是--zookeeper localhost:2181
根据我的文件正确的命令是
希望能节省别人的时间。
另外,请注意,这些消息不会立即被删除,而且会在关闭日志段时发生。
mf98qq942#
kafka没有清除/清理主题(队列)的直接方法,但是可以通过删除该主题并重新创建它来实现。
首先,确保sever.properties文件具有,如果没有,则添加
delete.topic.enable=true
然后,删除主题bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
然后再创建一次。liwlm1x93#
在kafka 0.8.2中测试,对于快速入门示例:首先,在config文件夹下的server.properties文件中添加一行:
然后,可以运行以下命令:
vcirk6k64#
以下是删除名为
MyTopic
:描述主题,不要考虑代理ID
为列出的每个代理id停止apachekafka守护程序。
连接到每个代理,并删除主题数据文件夹,例如。
rm -rf /tmp/kafka-logs/MyTopic-0
. 对其他分区和所有副本重复此操作删除主题元数据:
zkCli.sh
那么rmr /brokers/MyTopic
为每个停止的机器启动apachekafka守护程序如果您错过了步骤3,那么apachekafka将继续报告当前主题(例如,如果您运行
kafka-list-topic.sh
).使用ApacheKafka0.8.0进行测试。
rur96b6h5#
最简单的方法是将各个日志文件的日期设置为早于保留期。然后经纪人应该清理它们,并在几秒钟内为您移除它们。这有几个优点:
不需要关闭代理,这是一个运行时操作。
避免出现无效偏移异常的可能性(下面将详细介绍)。
根据我使用kafka0.7.x的经验,删除日志文件并重新启动代理可能会导致某些使用者出现无效的偏移异常。这是因为代理在零处重新启动偏移量(在没有任何现有日志文件的情况下),而以前从主题中消费的使用者将重新连接以请求特定的[一旦有效]偏移量。如果这个偏移量恰好落在新主题日志的边界之外,那么就不会有任何损害,并且使用者会在开始或结束时恢复。但是,如果偏移量落在新主题日志的范围内,代理将尝试获取消息集,但失败,因为偏移量没有与实际消息对齐。
这可以通过清除zookeeper中该主题的消费者补偿来缓解。但是,如果您不需要一个原始主题,只想删除现有的内容,那么简单地“触摸”几个主题日志要比停止代理、删除主题日志和清除某些zookeeper节点容易得多,也更可靠。
q3qa4bjr6#
要清除队列,可以删除主题:
然后重新创建:
tzdcorbm7#
另一种手动清除主题的方法是:
在经纪人中:
阻止Kafka的经纪人
sudo service kafka stop
删除所有分区日志文件(应在所有代理上执行)sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
Zookeeper:运行zookeeper命令行界面
sudo /usr/lib/zookeeper/bin/zkCli.sh
使用zkcli删除主题元数据rmr /brokers/topic/<some_topic_name>
在经纪人中:重新启动代理服务
sudo service kafka start
b1uwtaje8#
虽然公认的答案是正确的,但该方法已被弃用。主题配置现在应该通过
kafka-configs
.通过此方法设置的配置可以通过命令显示
nafvub8i9#
无法添加为注解,因为大小:不确定这是否为真,除了更新retention.ms和retention.bytes之外,但我注意到主题清理策略应该是“delete”(默认值),如果是“compact”,它将保留更长的邮件,即,如果是“compact”,您还必须指定delete.retention.ms。
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
还得监控最早/最新的偏移量应相同,以确认这是否成功发生,还可以查看du-h/tmp/kafka日志/test-topic-3-100-*./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
另一个问题是,您必须首先获取当前配置,以便记住在删除成功后还原:./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
3wabscal10#
有时,如果您有一个饱和的集群(太多的分区,或者使用加密的主题数据,或者使用ssl,或者控制器在一个坏节点上,或者连接是脆弱的),那么清除该主题将需要很长时间。
我遵循这些步骤,尤其是在使用avro时。
1:使用Kafka工具运行:
2:在架构注册表节点上运行:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3:一旦主题为空,将主题保留设置回原始设置。希望这能帮助别人,因为它不容易宣传。
wlsrxk5111#
使用应用程序组(groupname应与application kafka group name相同)清除特定主题中的所有消息。
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
qyzbxkaa12#
来自Kafka1.1
清除主题
等待1分钟,以确保Kafka清除主题删除配置,然后转到默认值
vs3odd8k13#
将主题的保留时间临时更新为1秒:
在较新的Kafka版本中,你也可以用
kafka-configs --entity-type topics
```kafka-configs.sh --zookeeper :2181 --entity-type topics --alter --entity-name --add-config retention.ms=1000
vdgimpew14#
在@steven appleyard answer之后,我在kafka 2.2.0上执行了以下命令,它们为我工作。
hc2pp10m15#
这应该给
retention.ms
已配置。然后您可以使用上面的alter命令更改为1秒(稍后恢复为默认值)。