在尚未处理的主题中保留数据

eit6fx6z  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(215)

我已经设置tweepy来获取tweets,并向topic tweepy\u topic和一个从topic读取的流写入tweepy\u topic。

-- Create topic for tweepy to write into
CREATE STREAM TWEEPY_STREAM (
    id BIGINT,
    lang VARCHAR,
    tweet VARCHAR,
    user STRUCT<id BIGINT,
                screen_name VARCHAR>)
    WITH (
        KAFKA_TOPIC= 'TWEEPY_TOPIC',
        VALUE_FORMAT = 'AVRO'
        );

还有另一个流从上面的流中读取并将其写入另一个主题(使用kafka connect将其推送到ElasticSearch)。

-- Create another topic with ML data.
-- GETSENTIMENT and GETFOURCLASS are custom ksql functions
CREATE STREAM ELASTIC_STREAM
WITH (
    KAFKA_TOPIC = 'ELASTIC_TOPIC',
    VALUE_FORMAT = 'AVRO',
    PARTITIONS = 1, REPLICAS = 1
)
AS SELECT 
    id,
    lang,
    tweet,
    user,
    GETSENTIMENT(tweet) as sentiment,
    GETFOURCLASS(tweet) as fourclass
FROM TWEEPY_STREAM;

用户定义函数 GETSENTIMENT 以及 GETFOURCLASS 向python模型服务器发出post请求,后者返回分类。这些api响应目前需要接近0.5-1秒的时间。
我担心的是如果第一个主题中的数据 TWEEPY_TOPIC 如果在默认保留期(7天)之后清除,则不会被 ELASTIC_STREAM . 有没有办法设置某种标志,告诉Kafka不要删除尚未处理的数据?我也愿意接受重新设计的建议。

gwo2fgha

gwo2fgha1#

Kafka没有一个清理策略,只删除已消费的邮件。
另一种方法是使用压缩主题。压缩主题具有不同的清理策略,并且保留所有唯一密钥的最新消息。
一旦消息被使用,您就可以使用空值向压缩的主题发送一条新消息。这将该消息标记为逻辑删除,并将在下一个压缩周期中由日志清理器清理(删除)。

相关问题