保留主题中尚未处理的数据

Retain data in topic that hasn't been processed yet

我已经设置了 tweepy 来获取推文并写入主题 TWEEPY_TOPIC 以及从主题中读取的流。

-- Create topic for tweepy to write into
CREATE STREAM TWEEPY_STREAM (
    id BIGINT,
    lang VARCHAR,
    tweet VARCHAR,
    user STRUCT<id BIGINT,
                screen_name VARCHAR>)
    WITH (
        KAFKA_TOPIC= 'TWEEPY_TOPIC',
        VALUE_FORMAT = 'AVRO'
        );

还有另一个流从上面的流中读取并将其写入另一个主题(使用 kafka-connect 推送到弹性搜索)。

-- Create another topic with ML data.
-- GETSENTIMENT and GETFOURCLASS are custom ksql functions
CREATE STREAM ELASTIC_STREAM
WITH (
    KAFKA_TOPIC = 'ELASTIC_TOPIC',
    VALUE_FORMAT = 'AVRO',
    PARTITIONS = 1, REPLICAS = 1
)
AS SELECT 
    id,
    lang,
    tweet,
    user,
    GETSENTIMENT(tweet) as sentiment,
    GETFOURCLASS(tweet) as fourclass
FROM TWEEPY_STREAM;

用户定义函数 GETSENTIMENTGETFOURCLASS 向 returns 分类的 python 模型服务器发出 POST 请求。这些 API 响应目前需要接近 0.5-1 秒。

我担心的是,如果第一个主题 TWEEPY_TOPIC 中的数据在默认保留期(7 天)后被清除,它不会被 ELASTIC_STREAM 提取。有没有办法设置某种标志来告诉kafka不要删除尚未处理的数据?我也愿意接受重新设计的建议。

Kafka 没有只删除已消费消息的清理策略。

另一种方法是使用压缩主题。压缩主题具有不同的清理策略,并保留所有唯一键的最新消息。

消息被使用后,您可以将新消息发送到具有空值的压缩主题。这将该消息标记为逻辑删除,并将在下一个压缩周期中被日志清理器清理(删除)。