React堆Kafka:无论计算机的CPU数量如何,消息消耗始终在一个线程上

mjqavswn  于 2023-02-11  发布在  Apache
关注(0)|答案(1)|浏览(109)

关于Kafka的小问题。
我有一个非常简单的KafkaReact堆项目。

package com.example.micrometer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;

@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);

    public static void main(String... args) {
        new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).run(args);
    }

    @Override
    public void run(String... args) {
    }

    @Bean
    Consumer<Flux<Message<String>>> consume() {
        return flux -> flux.flatMap(one -> myHandle(one) ).subscribe();
    }

    private Mono<String> myHandle(Message<String> one) {
        log.info("<==== look at this thread" + "\u001B[32m" + one.getPayload() + "\u001B[0m");
        String payload = one.getPayload();
        String decryptedPayload = complexInMemoryDecryption(payload); //this is NON blocking, takes 1 second
        String complexMatrix = convertDecryptedPayloadToGiantMatrix(decryptedPayload);  //this is NON blocking, takes 1 second
        String newMatrix = matrixComputation(complexMatrix); //this is NON blocking, takes 1 second
        return myNonBlockingReactiveRepository.save(complexMatrix);
    }

}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>streamreactiveconsumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.2</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2022.0.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

(Note,它不是Spring的Kafka项目,也不是Spring的云流项目)
我正在使用一个有3个分区的主题。发送消息的速率是每秒一条消息。
每条消息的消费和处理需要30秒。
重要提示:请注意处理不包含任何阻塞操作。它是一个巨大的内存解密+巨大的矩阵计算。它是BlockHound测试的NON阻塞。

    • 实际**:当我使用Reactor Kafka项目的消息时,整个使用仅发生在一个线程上。

所有操作都将在container-0-C-1上进行,并使用2个CPU、4个CPU、8个CPU的硬件进行测试

2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
    • 预期**:我们从基于http webflux迁移到基于Kafka消费。业务逻辑一点也没有改变。

在Reactor Netty Spring webflux应用程序中,我们可以看到对应于reactor内核的多个线程正在进行处理,在具有多个内核的机器上,这可以很容易地跟上。

[or-http-epoll-1] [or-http-epoll-2] [or-http-epoll-3] [or-http-epoll-4]

只需在任何一个reactor-http-epoll-N之间切换即可进行处理。我可以看到,reactor-http-epoll-1处理第一条消息的复杂内存计算,reactor-http-epoll-3处理第二条消息的计算,以此类推...并行性很明显
我知道有办法"规模"这个应用程序,但这是一个问题,就React堆Kafka本身而言。
我希望消息可以被并行处理。某种容器-0-C-1用于第一条消息,容器-0-C-2用于第二条消息,等等......
我该怎么做?我错过了什么?
谢谢

t9aqgxwy

t9aqgxwy1#

通常在Kafka消费者中,将轮询周期与处理逻辑分开是一个好主意。KafkaConsumer也有原生的I/O线程。有时这种架构被称为“流水线消费者”。在这种架构中,轮询线程不断地从kafka获取记录,然后将它们“馈送”到某个有界缓冲区/队列(即ArrayBlockingQueueLinkedBlockingQueue)。在另一端,处理线程从队列中获取记录并处理它们。它允许将轮询逻辑与实现缓冲和后向压力的处理分离。
Reactor Kafka构建在KafkaConsumer API之上,并使用类似的架构实现带有后向压力的React流。KafkaReceiver提供轮询周期,默认情况下,在Schedulers.single线程上发布获取的记录。
现在,根据您的逻辑,您可以顺序或并行地处理数据和提交偏移。对于并发处理,请使用flatMap,默认情况下,它并行处理256条记录,并且可以使用concurrency参数进行控制。

kafkaReceiver.receive()
    .flatMap(rec -> proces(rec), concurrency)

如果添加日志记录,您将看到所有记录都在kafka-receiver-2上接收,但在不同的parallel-#线程上处理。

12:50:08.347  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-2, partition: 0
12:50:08.349  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-3, partition: 0
12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-4, partition: 0
12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-6, partition: 0
12:50:08.351  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-9, partition: 0
12:50:08.353  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-0, partition: 2
12:50:08.354  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-8, partition: 2
12:50:08.355  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-1, partition: 1
12:50:08.356  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-5, partition: 1
12:50:08.358  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-7, partition: 1
12:50:09.353  [parallel-3] INFO [c.e.d.KafkaConsumerTest] - process: value-2, partition: 0
12:50:09.353  [parallel-6] INFO [c.e.d.KafkaConsumerTest] - process: value-6, partition: 0
12:50:09.353  [parallel-4] INFO [c.e.d.KafkaConsumerTest] - process: value-3, partition: 0
12:50:09.353  [parallel-5] INFO [c.e.d.KafkaConsumerTest] - process: value-4, partition: 0
12:50:09.355  [parallel-7] INFO [c.e.d.KafkaConsumerTest] - process: value-9, partition: 0
12:50:09.360  [parallel-10] INFO [c.e.d.KafkaConsumerTest] - process: value-1, partition: 1
12:50:09.360  [parallel-9] INFO [c.e.d.KafkaConsumerTest] - process: value-8, partition: 2
12:50:09.360  [parallel-8] INFO [c.e.d.KafkaConsumerTest] - process: value-0, partition: 2
12:50:09.361  [parallel-11] INFO [c.e.d.KafkaConsumerTest] - process: value-5, partition: 1
12:50:09.361  [parallel-12] INFO [c.e.d.KafkaConsumerTest] - process: value-7, partition: 1

换句话说,这是设计好的,您不应该担心轮询逻辑,您可以通过增加flatMap的并行度来扩展处理。

相关问题