Kafka清理政策“COMPACT”:邮件未从主题中清除

igetnqfo  于 2023-08-02  发布在  Apache
关注(0)|答案(1)|浏览(108)

我有2个主题topic1和topic2,我正在将消息生成到测试主题中
主题1的配置:

Topic: topic1     TopicId: <topic_ID> PartitionCount: 1       ReplicationFactor: 1    Configs: min.insync.replicas=1,cleanup.policy=delete,retention.ms=24192000000,retention.bytes=-1
        Topic: topic1     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

字符串
主题2的配置:

Topic: topic2    TopicId: <topic_ID> PartitionCount: 1       ReplicationFactor: 1    Configs: min.insync.replicas=1,cleanup.policy=compact,retention.ms=60000,retention.bytes=-1
        Topic: topic2     Partition: 0    Leader: 0       Replicas: 0     Isr: 0


测试主题中的消息如下所示:
消息1:
关键字:

{
    "COl1": {
        "string": "dummy001"
    }
}


数值:

{
    "data": {
        "Col1": "dummy001",
        "Col2": "2023-01-01 01:01:01",
        "Col3": "1000.100"
    },
    "operation": "INSERT",
    "timestamp": "2023-01-01 01:01:01"
}


消息2:
关键字:

{
    "COl1": {
        "string": "dummy001"
    }
}


数值:

{
    "data": {
        "Col1": "dummy001",
        "Col2": "2023-02-02 02:02:02",
        "Col3": "2000.100"
    },
    "operation": "UPDATE",
    "timestamp": "2023-02-02 02:02:02"
}


我已经创建了一个Kafka流“stream1”来从topic1中提取我的数据。下面是stream1中的数据:

+--------------------------------------------------------------------------------------------------------------------------------------------------+
|DATA                                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------------------+
|{COL1=dummy001, COL2=2023-01-01 01:01:01, COL3=1000.100}                                                                                          |
|{COL1=dummy001, COL2=2023-02-02 02:02:02, COL3=2000.100}


我已经创建了另一个Kafka流“stream2”,并将col1定义为Key列。

ksql> create stream stream2 (col1 string key, col2 string, col3 string) with(kafka_topic='topic2', value_format='AVRO');


然后,我将值从stream1插入stream2

ksql> insert into stream2 select data->COL1 as COL1, data->COL2 as COL2, data->COl3 as COL3  from stream1 partition by data->col1;


我在topic2中的数据如下:
消息1:密钥:
虚拟001
数值:

{
    "COL2": {
        "string": "2023-01-01 01:01:01"
    },
    "COL3": {
        "string": "1000.100"
    }
}


消息2:密钥:dummy001值:

{
    "COL2": {
        "string": "2023-02-02 02:02:02"
    },
    "COL3": {
        "string": "2000.100"
    }
}


我所面临的问题是符合清理政策“紧凑”。虽然我已经设置了清理策略来压缩我的邮件没有被删除。正如我在上面提到的,我是如何使用stream1和stream2从topic1处理到topic2的。即使在将键列定义为col1之后,清理策略也不起作用。当我将清理策略设置为“COMPACT,DELETE”时,它会删除所有邮件。
根据我的理解,由于消息有一个关键列,它应该保持最新的消息,因为每个消息的关键列,并清除旧的消息。但这不管用。
如果我错过了什么,请告诉我。
我试图实现的是在主题2中的任何给定时间点,按照col1保持最新消息。

inkz8wg9

inkz8wg91#

您设置的策略不保证它将在何时运行。因此,这并不意味着每次发布相同的密钥时都会运行压缩(这将是低效的)。
消费者需要维护他们对数据的视图的“最新”状态。

相关问题