我已经设置tweepy来获取tweets,并向topic tweepy\u topic和一个从topic读取的流写入tweepy\u topic。
-- Create topic for tweepy to write into
CREATE STREAM TWEEPY_STREAM (
id BIGINT,
lang VARCHAR,
tweet VARCHAR,
user STRUCT<id BIGINT,
screen_name VARCHAR>)
WITH (
KAFKA_TOPIC= 'TWEEPY_TOPIC',
VALUE_FORMAT = 'AVRO'
);
还有另一个流从上面的流中读取并将其写入另一个主题(使用kafka connect将其推送到ElasticSearch)。
-- Create another topic with ML data.
-- GETSENTIMENT and GETFOURCLASS are custom ksql functions
CREATE STREAM ELASTIC_STREAM
WITH (
KAFKA_TOPIC = 'ELASTIC_TOPIC',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 1, REPLICAS = 1
)
AS SELECT
id,
lang,
tweet,
user,
GETSENTIMENT(tweet) as sentiment,
GETFOURCLASS(tweet) as fourclass
FROM TWEEPY_STREAM;
用户定义函数 GETSENTIMENT
以及 GETFOURCLASS
向python模型服务器发出post请求,后者返回分类。这些api响应目前需要接近0.5-1秒的时间。
我担心的是如果第一个主题中的数据 TWEEPY_TOPIC
如果在默认保留期(7天)之后清除,则不会被 ELASTIC_STREAM
. 有没有办法设置某种标志,告诉Kafka不要删除尚未处理的数据?我也愿意接受重新设计的建议。
1条答案
按热度按时间gwo2fgha1#
Kafka没有一个清理策略,只删除已消费的邮件。
另一种方法是使用压缩主题。压缩主题具有不同的清理策略,并且保留所有唯一密钥的最新消息。
一旦消息被使用,您就可以使用空值向压缩的主题发送一条新消息。这将该消息标记为逻辑删除,并将在下一个压缩周期中由日志清理器清理(删除)。