获取大块和错过心跳

2g32fytz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(242)

我正在研究一个spring引导服务,它从apachekafka读取消息,通过http从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中,并将结果发布到另一个主题。
这是通过

@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)

这是在几个服务中完成的,通常工作得很好。唯一的属性集是

spring.cloud.stream.binder.consumer.concurrency=20

主题本身有20个分区,应该适合。
在监视kafka的读取时,我们发现吞吐量非常低,行为也很奇怪:
这个应用程序一次最多可以读取500条信息,然后是1-2分钟的空白。在这段时间内,使用者反复记录它是“丢失心跳,因为分区被重新平衡”、“重新分配分区”,有时甚至抛出异常,说它“未能提交,因为轮询间隔已过”
我们得出的结论是,这意味着使用者获取500条消息,处理所有消息需要很长时间,错过了它的时间窗口,因此无法将500条消息中的任何一条提交给代理—代理将重新分配分区并重新发送相同的消息。
在浏览了线程和文档之后,我找到了“max.poll.records”属性,但在设置该属性的位置上存在冲突。
有人说要把它放在下面

spring.cloud.stream.bindings.consumer.<input>.configuration

有人说

spring.cloud.stream.kafka.binders.consumer-properties

我尝试将两者都设置为1,但服务行为没有改变。
我如何正确地处理这样一种情况,即消费者无法使用默认设置跟上所需的轮询间隔?
普通yaml:

spring.cloud.stream.default.group=${spring.application.name}

服务yaml

spring:
  clould:
    stream:
      default:
        consumer.headerMode: embeddedHeaders
        producer.headerMode: embeddedHeaders
      bindings:
       someOutput:
         destination: outTopic
       someInput:
         destination: inTopic
           consumer:
             concurrency: 30
      kafka:
        bindings:
          consumer:
            someInput:
              configuarion:
                max.poll.records: 20 # ConsumerConfig ignores this
              consumer:
                enableDlq: true
                configuarion:
                  max.poll.records: 30 # ConsumerConfig ignores this
          someInput:
            configuarion:
              max.poll.records: 20 # ConsumerConfig ignores this
            consumer:
              enableDlq: true
              configuarion:
                max.poll.records: 30 # ConsumerConfig ignores this
        binder:
          consumer-properties:
            max.poll.records: 10 # this gets used first
          configuration:
            max.poll.records: 40 # this get used when the first one is not present

“忽略此项”始终意味着,如果未设置其他属性,consumerconfiguration会将最大轮询记录的默认值保持为500
编辑:我们更接近了:
这个问题与spring retry设置了exponentialbackoffstrategy有关,还有一系列错误有效地停止了应用程序。
我不明白的是,我们强迫200个错误通过张贴错误的消息到主题的问题,这导致应用程序读取200,采取年龄(与旧的重试配置),然后提交所有200个错误一次。
如果我们有

max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
9o685dep

9o685dep1#

是的

spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.

请参阅文档。。。
Kafka消费地产
以下属性仅适用于kafka使用者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. ...
配置
使用包含通用kafka使用者属性的键/值对进行Map。
默认值:空Map。
...
你还可以增加 max.poll.interval.ms .
编辑
我刚刚用2.1.0.release进行了测试,它的工作原理与我描述的一样:
无设置

2019-03-01 08:47:59.560  INFO 44698 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 500
    ...

启动默认值

spring.kafka.consumer.properties.max.poll.records=42

2019-03-01 08:49:49.197  INFO 45044 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 42
    ...

活页夹默认值#1

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43

2019-03-01 08:52:11.469  INFO 45842 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

活页夹默认值#2

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43

2019-03-01 08:54:06.211  INFO 46252 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

绑定默认值

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44

2019-03-01 09:02:26.004  INFO 47833 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 44
    ...

结合特异性

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

2019-03-01 09:05:01.452  INFO 48330 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 45
    ...

编辑2
这是完整的测试应用程序。我只是在http://start.spring.io and 选择“Kafka”和“云流”。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application {

    public static void main(String[] args) {
        SpringApplication.run(So54932453Application.class, args).close();
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}

spring.cloud.stream.bindings.input.group=so54932453

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>net.gprussell</groupId>
    <artifactId>so54932453</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>so54932453</name>
    <description>Demo</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>

相关问题