使用带有kafka的spring云流进行重复消息处理

flvlnr44  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(394)

我用的是带Kafka活页夹的Spring Cloud流。它工作得很好,但是客户端接收到重复的消息。已经尝试了所有Kafka消费品,但没有结果。
检查我的应用程序示例中的2个类-aggregateapplication和eventfilterapplication。如果我运行eventfilterapplication-只有1条消息,如果aggregateapplication-有2条相同的消息。
下面是我的代码:
1) 聚合器

import com.example.EventFilterApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;

@SpringBootApplication
public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
            .from(EventFilterApplication.class)
            .run(args);
    }
}

2) 事件筛选器应用程序

@SpringBootApplication
@EnableBinding(EventFilterApplication.LiveProcessor.class)
public class EventFilterApplication {

    @Autowired
    LiveProcessor source;

    @StreamListener(LiveProcessor.INPUT)
    public void handle(byte[] event) {
        try {

            System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event));

        } catch (Exception e) {
            System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event)));
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(EventFilterApplication.class, args);
    }

    interface LiveProcessor extends Source {

        String INPUT = "liveSource";

        @Input(INPUT)
        SubscribableChannel input();
    }
}

3) 应用程序.yml

spring:
cloud:
    stream:
        kafka:
          binder:
              brokers: kafka-broker.example.com:9092
              defaultBrokerPort: 9092
              defaultZkPort: 2181
              zkNodes: kafka-zookeeper.example.com
        type: kafka
        bindings:
            liveSource:
                binder: kafka
                consumer:
                    headerMode: raw
                    autoCommitOffset: true
                destination: topic_example_name

4) 构建.gradle

buildscript {
    ext { springBootVersion = '1.4.2.RELEASE' }
    repositories {
        jcenter()
        maven { url 'http://repo.spring.io/plugins-release' }
    }
    dependencies {
        classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7")
        classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
        classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE")
    }
}

ext['logstashLogbackEncoderV'] = '4.8'
ext['springCloudV'] = 'Camden.SR1'
ext['springCloudStreamV'] = 'Brooklyn.SR2'
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE'

subprojects {
    apply plugin: 'java'
    apply plugin: 'propdeps'
    apply plugin: 'propdeps-idea'
    apply plugin: "io.spring.dependency-management"

    sourceCompatibility = 1.8

    dependencyManagement {
        imports {
            mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1"
            mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2"
            mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE"
        }
    }

    dependencies {
        compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") {
            exclude module: "spring-boot-starter-tomcat"
            exclude group: 'log4j'
        }

        compile("org.springframework.cloud:spring-cloud-starter-stream-kafka")

        compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") {
            exclude group: "org.slf4j"
        }

        compile("org.springframework.cloud:spring-cloud-stream:")

        compile("org.springframework.cloud:spring-cloud-starter-sleuth")

        compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}")

        testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
            exclude group: "org.slf4j"
        }
    }
}
x8goxv8g

x8goxv8g1#

复制是由 EventFilterApplication 作为父根:

public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
            .from(EventFilterApplication.class)
            .run(args);
    }
}

这很可能会创建两个订阅。而不是添加 EventFilterApplication 作为root用户,您只需执行以下操作:

public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(args)
            .from(EventFilterApplication.class)
            // rest of the pipeline
            .run(args);
    }
}

如果不需要创建聚合,这就足够了:

public static void main(String[] args) {
        SpringApplication.run(EventFilterApplication.class, args);
}

编辑:增加了一个额外的例子和澄清的答案。

相关问题