我有2个主题topic1和topic2,我正在将消息生成到测试主题中
主题1的配置:
Topic: topic1 TopicId: <topic_ID> PartitionCount: 1 ReplicationFactor: 1 Configs: min.insync.replicas=1,cleanup.policy=delete,retention.ms=24192000000,retention.bytes=-1
Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
字符串
主题2的配置:
Topic: topic2 TopicId: <topic_ID> PartitionCount: 1 ReplicationFactor: 1 Configs: min.insync.replicas=1,cleanup.policy=compact,retention.ms=60000,retention.bytes=-1
Topic: topic2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
型
测试主题中的消息如下所示:
消息1:
关键字:
{
"COl1": {
"string": "dummy001"
}
}
型
数值:
{
"data": {
"Col1": "dummy001",
"Col2": "2023-01-01 01:01:01",
"Col3": "1000.100"
},
"operation": "INSERT",
"timestamp": "2023-01-01 01:01:01"
}
型
消息2:
关键字:
{
"COl1": {
"string": "dummy001"
}
}
型
数值:
{
"data": {
"Col1": "dummy001",
"Col2": "2023-02-02 02:02:02",
"Col3": "2000.100"
},
"operation": "UPDATE",
"timestamp": "2023-02-02 02:02:02"
}
型
我已经创建了一个Kafka流“stream1”来从topic1中提取我的数据。下面是stream1中的数据:
+--------------------------------------------------------------------------------------------------------------------------------------------------+
|DATA |
+--------------------------------------------------------------------------------------------------------------------------------------------------+
|{COL1=dummy001, COL2=2023-01-01 01:01:01, COL3=1000.100} |
|{COL1=dummy001, COL2=2023-02-02 02:02:02, COL3=2000.100}
型
我已经创建了另一个Kafka流“stream2”,并将col1定义为Key列。
ksql> create stream stream2 (col1 string key, col2 string, col3 string) with(kafka_topic='topic2', value_format='AVRO');
型
然后,我将值从stream1插入stream2
ksql> insert into stream2 select data->COL1 as COL1, data->COL2 as COL2, data->COl3 as COL3 from stream1 partition by data->col1;
型
我在topic2中的数据如下:
消息1:密钥:
虚拟001
数值:
{
"COL2": {
"string": "2023-01-01 01:01:01"
},
"COL3": {
"string": "1000.100"
}
}
型
消息2:密钥:dummy001值:
{
"COL2": {
"string": "2023-02-02 02:02:02"
},
"COL3": {
"string": "2000.100"
}
}
型
我所面临的问题是符合清理政策“紧凑”。虽然我已经设置了清理策略来压缩我的邮件没有被删除。正如我在上面提到的,我是如何使用stream1和stream2从topic1处理到topic2的。即使在将键列定义为col1之后,清理策略也不起作用。当我将清理策略设置为“COMPACT,DELETE”时,它会删除所有邮件。
根据我的理解,由于消息有一个关键列,它应该保持最新的消息,因为每个消息的关键列,并清除旧的消息。但这不管用。
如果我错过了什么,请告诉我。
我试图实现的是在主题2中的任何给定时间点,按照col1保持最新消息。
1条答案
按热度按时间inkz8wg91#
您设置的策略不保证它将在何时运行。因此,这并不意味着每次发布相同的密钥时都会运行压缩(这将是低效的)。
消费者需要维护他们对数据的视图的“最新”状态。