我正在测试和调试一个运行在kafka之上并使用samza的事件源(或有状态流处理)应用程序。我想删除kafka中的队列和主题,以便samza jobs在启动时得到一个空的kafka安装。
我怎么做??
编辑:
这个问题比我一开始写的更复杂更具体。
正如david所说,有一个解决方案可以清除这个主题,从kafka 0.8.2开始:清除kafka队列
我感兴趣的是建立一个测试环境,自动加载zookeeper和kafka(它们作为二进制包捆绑在我的git存储库中)。
我正在使用gradle环境+eclipse和junit。我从eclipse运行集成测试(作为junit测试)。
如何实现装载的自动化?我应该创建一个特定的测试类来设置环境并启动kafka和zookeeper吗?是否有参考示例/代码?这个想法是加载环境,运行一些测试,然后停止。如果这个过程能在几秒钟内完成,那就更好了。
1条答案
按热度按时间oewdyzsn1#
清除单个主题有不同的方法。所有这些都可以扩展到清除所有主题。然而,我认为您提出了一个更大的问题,与为Kafka创建基线环境相关——您可能需要一些测试。或者你有一个生产过程,每次都从零开始。这实际上是不同的情况。
用于测试
如果你说的是测试,那么我会故意做一些暴力的事情。首先,我会将Kafka配置成我希望它在启动时的样子。然后我会关闭它并备份它——或者使用
tar
或者甚至可能制作一个磁盘映像,如果我使用的是虚拟机的话。我会在测试过程中使用它并滥用它,然后在测试完成后将其全部扔掉。”重置Kafka“只意味着通过
untar
或者从磁盘映像或者别的什么(rsync
甚至,或者只是cp
从另一个目录)。对于测试,我真的想要一个干净的开始,所以我更喜欢暴力。
生产过程中
如果这是你的生产过程的一部分——我从表面上怀疑这是否明智——那么我会尽量先不丢失数据。要么在流程中包含备份,要么不实际重置主题。
主题重命名还不存在——但是您可以使用将要使用的相同方法。不要直接处理主题名——使用字典将虚拟主题名Map到实际的主题名。
然后,不是每次都“重置”Kafka,而是创建所有主题的新版本,并更新字典以将虚拟主题名称Map到新创建的主题版本。