我正在开发一个事件驱动架构的应用程序。
我试图模拟以下事件流:
useraccountcreated(用户管理事件)->发送电子邮件->mailnotificationsent(通知服务事件)
通知服务应用程序执行整个流程。它通过侦听用户管理事件主题来等待useraccountcreated事件。当接收到事件时,应用程序发送电子邮件并发布一个新的事件—mailnotificationsent到notification service事件主题。
我在监听第一个事件(useraccountcreated)方面没有问题—应用程序接收它并执行流的其余部分。我对发布mailnotificationsent事件也没有问题。不幸的是,出于开发目的,我希望侦听通知服务中的mailnotificationsent事件,因此应用程序必须同时侦听useraccountcreated和mailnotificationsent。在这里我不能让它工作。
让我们看一下实现:
通知流:
public interface NotificationStreams {
String INPUT = "notification-service-events-in";
String OUTPUT = "notification-service-events-out";
@Input(INPUT)
SubscribableChannel inboundEvents();
@Output(OUTPUT)
MessageChannel outboundEvents();
}
通知第七名听众:
@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationEventsListener {
@StreamListener(NotificationStreams.INPUT)
public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
input.subscribe(event -> {
log.info("Received event ActivationLinkSent: " + event.toString());
});
}
}
用户管理事件:
public interface UserManagementEvents {
String INPUT = "user-management-events";
@Input(INPUT)
SubscribableChannel inboundEvents();
}
用户管理事件侦听器:
@Slf4j
@Component
@RequiredArgsConstructor
public class UserManagementEventsListener {
private final Gate gate;
@StreamListener(UserManagementEvents.INPUT)
public void userManagementEvents(Flux<UserAccountCreated> input) {
input.subscribe(event -> {
log.info("Received event UserAccountCreated: " + event.toString());
gate.dispatch(SendActivationLink.builder()
.email(event.getEmail())
.username(event.getUsername())
.build()
);
});
}
}
kafkastreamsconfig公司:
@EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
public class KafkaStreamsConfig {
}
事件发布者:
@Slf4j
@RequiredArgsConstructor
@Component
public class EventPublisher {
private final NotificationStreams eventsStreams;
private final AvroMessageBuilder messageBuilder;
public void publish(Event event) {
MessageChannel messageChannel = eventsStreams.outboundEvents();
AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent(); activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
activationLinkSent.setTimestamp(System.currentTimeMillis());
messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
}
}
应用程序配置:
spring:
devtools:
restart:
enabled: true
cloud:
stream:
default:
contentType: application/*+avro
kafka:
binder:
brokers: localhost:9092
schemaRegistryClient:
endpoint: http://localhost:8990
kafka:
consumer:
group-id: notification-group
auto-offset-reset: earliest
kafka:
bootstrap:
servers: localhost:9092
应用程序似乎忽略了通知服务事件侦听器。它在只听一个流时工作。
我几乎100%确定这不是发布事件的问题,因为我已手动连接到kafka并验证消息是否正确发布:
kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning
你知道我还应该查什么吗?Spring侧是否有其他配置?
1条答案
按热度按时间pzfprimi1#
我找到了问题所在。我缺少绑定配置。在应用程序属性中,我应该添加以下行:
在用户管理服务中,我没有遇到这样的问题,因为我使用了不同的属性: