在Spring听许多Kafka的音乐

e0uiprwp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(333)

我正在开发一个事件驱动架构的应用程序。
我试图模拟以下事件流:
useraccountcreated(用户管理事件)->发送电子邮件->mailnotificationsent(通知服务事件)
通知服务应用程序执行整个流程。它通过侦听用户管理事件主题来等待useraccountcreated事件。当接收到事件时,应用程序发送电子邮件并发布一个新的事件—mailnotificationsent到notification service事件主题。
我在监听第一个事件(useraccountcreated)方面没有问题—应用程序接收它并执行流的其余部分。我对发布mailnotificationsent事件也没有问题。不幸的是,出于开发目的,我希望侦听通知服务中的mailnotificationsent事件,因此应用程序必须同时侦听useraccountcreated和mailnotificationsent。在这里我不能让它工作。
让我们看一下实现:
通知流:

public interface NotificationStreams {

    String INPUT = "notification-service-events-in";
    String OUTPUT = "notification-service-events-out";

    @Input(INPUT)
    SubscribableChannel inboundEvents();

    @Output(OUTPUT)
    MessageChannel outboundEvents();
}

通知第七名听众:

@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationEventsListener {

    @StreamListener(NotificationStreams.INPUT)
    public void notificationServiceEventsIn(Flux<ActivationLinkSent> input) {
        input.subscribe(event -> {
            log.info("Received event ActivationLinkSent: " + event.toString());
        });
    }
}

用户管理事件:

public interface UserManagementEvents {

    String INPUT = "user-management-events";

    @Input(INPUT)
    SubscribableChannel inboundEvents();
}

用户管理事件侦听器:

@Slf4j
@Component
@RequiredArgsConstructor
public class UserManagementEventsListener {

    private final Gate gate;

    @StreamListener(UserManagementEvents.INPUT)
    public void userManagementEvents(Flux<UserAccountCreated> input) {
        input.subscribe(event -> {
            log.info("Received event UserAccountCreated: " + event.toString());
            gate.dispatch(SendActivationLink.builder()
                .email(event.getEmail())
                .username(event.getUsername())
                .build()
            );
        });
    }
}

kafkastreamsconfig公司:

@EnableBinding(value = {NotificationStreams.class, UserManagementEvents.class})
public class KafkaStreamsConfig {
}

事件发布者:

@Slf4j
@RequiredArgsConstructor
@Component
public class EventPublisher {

    private final NotificationStreams eventsStreams;

    private final AvroMessageBuilder messageBuilder;

    public void publish(Event event) {
        MessageChannel messageChannel = eventsStreams.outboundEvents();

        AvroActivationLinkSent activationLinkSent = new AvroActivationLinkSent();                         activationLinkSent.setEmail(((ActivationLinkSent)event).getEmail());
        activationLinkSent.setUsername(((ActivationLinkSent)event).getUsername() + "-domain");
        activationLinkSent.setTimestamp(System.currentTimeMillis());
        messageChannel.send(messageBuilder.buildMessage(activationLinkSent));
    }
}

应用程序配置:

spring:
  devtools:
    restart:
      enabled: true
  cloud:
    stream:
      default:
        contentType: application/*+avro
      kafka:
        binder:
          brokers: localhost:9092
      schemaRegistryClient:
        endpoint: http://localhost:8990
  kafka:
    consumer:
      group-id: notification-group
      auto-offset-reset: earliest
kafka:
  bootstrap:
    servers: localhost:9092

应用程序似乎忽略了通知服务事件侦听器。它在只听一个流时工作。
我几乎100%确定这不是发布事件的问题,因为我已手动连接到kafka并验证消息是否正确发布:

kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic notification-service-events-out --from-beginning

你知道我还应该查什么吗?Spring侧是否有其他配置?

pzfprimi

pzfprimi1#

我找到了问题所在。我缺少绑定配置。在应用程序属性中,我应该添加以下行:

cloud:
 stream:
  bindings:
    notification-service-events-in:
      destination: notification-service-events
    notification-service-events-out:
      destination: notification-service-events
    user-management-events-in:
      destination: user-management-events

在用户管理服务中,我没有遇到这样的问题,因为我使用了不同的属性:

cloud:
   stream:
    default:
      contentType: application/*+avro
      destination: user-management-events

相关问题