我是Kafka的新人。我们正在尝试将csv文件中的数据导入kafka。我们需要每天导入数据,同时前一天的数据也被弃用了。如何删除python中kafka主题下的所有消息?或者我如何删除python中的kafka主题?或者我看到有人建议等待数据过期,如果可能的话,我如何设置数据过期时间?如有任何建议,我们将不胜感激!谢谢
mkshixfv1#
不能删除Kafka主题中的消息。你可以:套 log.retention.* 基本上是消息过期的属性。您可以选择基于时间的过期(例如。g。保留6小时前或更新后的邮件)或基于空间的过期邮件(例如。g。最多保留1 gb的消息)。请参阅代理配置并搜索保留。可以为不同的主题设置不同的值。删除整个主题。这有点棘手,我不建议这样。为每天创建一个新主题。类似于my-topic-2015-09-21。但我认为你根本不需要删除主题中的信息。因为你的Kafka消费者会跟踪已经处理过的消息。因此,当您阅读今天的所有消息时,kafka consumer会保存这些信息,而您明天只会阅读新消息。另一个可能的解决方案是原木压缩。但这更复杂,可能不是你需要的。基本上,您可以为Kafka主题中的每条消息设置一个键。如果您用同一密钥发送两条不同的消息,kafka将只保留主题中最新的消息,并删除所有使用同一密钥的旧消息。你可以把它看作是一种“键值存储”。每个具有相同密钥的消息只更新特定密钥下的一个值。但是,嘿,你真的不需要这个,仅供参考:-)。
log.retention.*
kt06eoxx2#
最简单的方法是删除主题。我在python自动化测试套件中使用了这一点,我想验证通过kafka发送的一组特定的测试消息,不想看到以前测试运行的结果
def delete_kafka_topic(topic_name): call(["/usr/bin/kafka-topics", "--zookeeper", "zookeeper-1:2181", "--delete", "--topic", topic_name])
2条答案
按热度按时间mkshixfv1#
不能删除Kafka主题中的消息。你可以:
套
log.retention.*
基本上是消息过期的属性。您可以选择基于时间的过期(例如。g。保留6小时前或更新后的邮件)或基于空间的过期邮件(例如。g。最多保留1 gb的消息)。请参阅代理配置并搜索保留。可以为不同的主题设置不同的值。删除整个主题。这有点棘手,我不建议这样。
为每天创建一个新主题。类似于my-topic-2015-09-21。
但我认为你根本不需要删除主题中的信息。因为你的Kafka消费者会跟踪已经处理过的消息。因此,当您阅读今天的所有消息时,kafka consumer会保存这些信息,而您明天只会阅读新消息。
另一个可能的解决方案是原木压缩。但这更复杂,可能不是你需要的。基本上,您可以为Kafka主题中的每条消息设置一个键。如果您用同一密钥发送两条不同的消息,kafka将只保留主题中最新的消息,并删除所有使用同一密钥的旧消息。你可以把它看作是一种“键值存储”。每个具有相同密钥的消息只更新特定密钥下的一个值。但是,嘿,你真的不需要这个,仅供参考:-)。
kt06eoxx2#
最简单的方法是删除主题。我在python自动化测试套件中使用了这一点,我想验证通过kafka发送的一组特定的测试消息,不想看到以前测试运行的结果