我用的是带Kafka活页夹的Spring Cloud流。它工作得很好,但是客户端接收到重复的消息。已经尝试了所有Kafka消费品,但没有结果。
检查我的应用程序示例中的2个类-aggregateapplication和eventfilterapplication。如果我运行eventfilterapplication-只有1条消息,如果aggregateapplication-有2条相同的消息。
下面是我的代码:
1) 聚合器
import com.example.EventFilterApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
@SpringBootApplication
public class AggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
.from(EventFilterApplication.class)
.run(args);
}
}
2) 事件筛选器应用程序
@SpringBootApplication
@EnableBinding(EventFilterApplication.LiveProcessor.class)
public class EventFilterApplication {
@Autowired
LiveProcessor source;
@StreamListener(LiveProcessor.INPUT)
public void handle(byte[] event) {
try {
System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event));
} catch (Exception e) {
System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event)));
}
}
public static void main(String[] args) {
SpringApplication.run(EventFilterApplication.class, args);
}
interface LiveProcessor extends Source {
String INPUT = "liveSource";
@Input(INPUT)
SubscribableChannel input();
}
}
3) 应用程序.yml
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka-broker.example.com:9092
defaultBrokerPort: 9092
defaultZkPort: 2181
zkNodes: kafka-zookeeper.example.com
type: kafka
bindings:
liveSource:
binder: kafka
consumer:
headerMode: raw
autoCommitOffset: true
destination: topic_example_name
4) 构建.gradle
buildscript {
ext { springBootVersion = '1.4.2.RELEASE' }
repositories {
jcenter()
maven { url 'http://repo.spring.io/plugins-release' }
}
dependencies {
classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7")
classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE")
}
}
ext['logstashLogbackEncoderV'] = '4.8'
ext['springCloudV'] = 'Camden.SR1'
ext['springCloudStreamV'] = 'Brooklyn.SR2'
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE'
subprojects {
apply plugin: 'java'
apply plugin: 'propdeps'
apply plugin: 'propdeps-idea'
apply plugin: "io.spring.dependency-management"
sourceCompatibility = 1.8
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1"
mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2"
mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE"
}
}
dependencies {
compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") {
exclude module: "spring-boot-starter-tomcat"
exclude group: 'log4j'
}
compile("org.springframework.cloud:spring-cloud-starter-stream-kafka")
compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") {
exclude group: "org.slf4j"
}
compile("org.springframework.cloud:spring-cloud-stream:")
compile("org.springframework.cloud:spring-cloud-starter-sleuth")
compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}")
testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
exclude group: "org.slf4j"
}
}
}
1条答案
按热度按时间x8goxv8g1#
复制是由
EventFilterApplication
作为父根:这很可能会创建两个订阅。而不是添加
EventFilterApplication
作为root用户,您只需执行以下操作:如果不需要创建聚合,这就足够了:
编辑:增加了一个额外的例子和澄清的答案。