使用springboot的kafka配置问题

dy1byipe  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(282)

我们的团队一直在尝试Kafka的问题。自从我们开始开发应用程序以来,这个问题就一直存在。
起初,这些问题很快就解决了。”它失败了?只需重新启动服务器”。既然我们想开始向公众发布我们的应用程序,这个“解决方案”就不再可行了。
我们面临的问题基本上有两个:

消费者停止工作

这是一个反复出现的问题。突然间,一些消费者就停了下来。消息被成功地发送到kafka,我们甚至可以使用kafka工具看到实际的消息,但是消费者就是不工作。

循环消息

这恰恰相反。有时会发送一条消息,而使用者会一直使用该消息,直到我们重新启动服务器。
我们尝试将kafka直接配置到服务器中,但是我们意识到,出于某种原因,kafka忽略了这些配置,直接从spring boot获取配置。
我们的配置如下:
消费者:

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER_GROUP);
    properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5000000);
    properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10800000);

制作人:

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.KAFKA_HOST);
    properties.put(ProducerConfig.RETRIES_CONFIG, 0);
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5000000);
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10800000);

您可以看到极端的超时和大小值,这是因为我们认为问题与消息的大小或服务器的超时有关。我们甚至重新设计了所有的应用程序流量,这样我们就可以发送更小的消息,这时我们才意识到这不是根本问题。
任何帮助都将不胜感激。
Kafka版本0.10
spring boot版本1.5.7 dalston.sr4
我们使用的是springcloudstarter流kafka依赖项。
我们检查了日志,但没有可识别的错误。事实上,这里只有信息信息,但没有一条是有用的。

hxzsmxv2

hxzsmxv21#

不幸的是,boot1.5引入了一个非常旧的springkafka版本(1.1.x),这个版本不再受支持。boot有严格的依赖版本控制规则。正如spring for apache kafka项目页面所述:
所有代理大于等于0.10.x.x的用户都建议使用SpringKafka版本1.3.x或更高版本,因为它的线程模型由于kip-62而更简单。
当前的1.3.x版本是1.3.5。尝试升级到该版本和0.11kafka客户机jar。
在1.1.x中,有一个复杂的逻辑,需要缓慢的侦听器暂停/恢复使用者,以避免代理重新平衡。虽然我自己没有看到过,但我也看到有报道称消费者在暂停后不能正常恢复。
由于kip-62,这种逻辑不再需要,因为心跳是在后台发送的。但是,您确实需要确保 max.poll.interval.ms 足够大以支持处理 max.poll.records .
如果可以的话,升级到boot2.0.4会更好,因为它引入了最新的springkafka2.1.7。

相关问题