为什么kafka ktable缺少条目?

ma8fv8wu  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(354)

我有一个使用kafka流中ktable的单示例java应用程序。直到最近,我还可以使用ktable检索所有数据,但突然有些消息似乎消失了。那里应该有~33k条带有唯一密钥的消息。
当我想通过键检索消息时,我没有得到一些消息。我使用readonlykeyvaluestore检索邮件:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore());
store.get(key);

这些是我为kafkastreams设置的配置设置。

final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

Kafka:0.10.2.0-cp1
汇合:3.2.0
调查给我带来了一些非常令人担忧的见解。我使用rest代理手动读取分区,发现一些偏移量返回错误。
请求: /topics/{topic}/partitions/{partition}/messages?offset={offset} ```
{
"error_code": 50002,
"message": "Kafka error: Fetch response contains an error code: 1"
}

没有客户端,java和命令行都不会返回任何错误。它们只是跳过错误的丢失消息,导致ktables中的数据丢失。一切都很好,没有任何通知,似乎不知何故,一些信息变得腐败。
我有两个代理,所有主题的复制因子都是2,并且完全复制。两个经纪人分别返回相同的。重启经纪人也没什么区别。
原因可能是什么?
如何在客户身上发现这种情况?
xiozqbni

xiozqbni1#

默认情况下,kafka代理配置密钥 cleanup.policy 设置为 delete . 设置为 compact 保留每个键的最新消息。参见压实。
删除旧邮件不会更改最小偏移量,因此尝试在其下方检索邮件会导致错误。这个错误很模糊。kafka streams客户机将从最小偏移量开始读取消息,因此没有错误。唯一可见的影响是ktables中缺少数据。
当应用程序由于缓存而运行时,即使从kafka本身删除了消息,所有数据可能仍然可用。它们会在清理后消失。

相关问题