我有一个使用kafka流中ktable的单示例java应用程序。直到最近,我还可以使用ktable检索所有数据,但突然有些消息似乎消失了。那里应该有~33k条带有唯一密钥的消息。
当我想通过键检索消息时,我没有得到一些消息。我使用readonlykeyvaluestore检索邮件:
final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore());
store.get(key);
这些是我为kafkastreams设置的配置设置。
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
Kafka:0.10.2.0-cp1
汇合:3.2.0
调查给我带来了一些非常令人担忧的见解。我使用rest代理手动读取分区,发现一些偏移量返回错误。
请求: /topics/{topic}/partitions/{partition}/messages?offset={offset}
```
{
"error_code": 50002,
"message": "Kafka error: Fetch response contains an error code: 1"
}
没有客户端,java和命令行都不会返回任何错误。它们只是跳过错误的丢失消息,导致ktables中的数据丢失。一切都很好,没有任何通知,似乎不知何故,一些信息变得腐败。
我有两个代理,所有主题的复制因子都是2,并且完全复制。两个经纪人分别返回相同的。重启经纪人也没什么区别。
原因可能是什么?
如何在客户身上发现这种情况?
1条答案
按热度按时间xiozqbni1#
默认情况下,kafka代理配置密钥
cleanup.policy
设置为delete
. 设置为compact
保留每个键的最新消息。参见压实。删除旧邮件不会更改最小偏移量,因此尝试在其下方检索邮件会导致错误。这个错误很模糊。kafka streams客户机将从最小偏移量开始读取消息,因此没有错误。唯一可见的影响是ktables中缺少数据。
当应用程序由于缓存而运行时,即使从kafka本身删除了消息,所有数据可能仍然可用。它们会在清理后消失。