angular—如何使用connectable flux/server sent event/kafka处理不再侦听的订阅服务器

kxeu7u2r  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(258)

我的ui在angular9上,而我的后端使用spring-boot、webflux、reactor-core和kafka。我的想法是,当后端从kafka主题接收到事件时,我希望在ui中显示这些事件。我创造了这样一个可连接的通量:

@Bean
ConnectableFlux<ServerSentEvent<VinChangedEvent>> vinResponseKafkaEventPublisher(ReceiverOptions<String, VinChangedEvent> vinResponseKafkaReceiverOptions) {
    return KafkaReceiver.create(vinResponseKafkaReceiverOptions)
            .receive()
            .map(vinChangedEventReceiverRecord -> ServerSentEvent.builder(vinChangedEventReceiverRecord.value()).build())
            .publish();
}

我将这个bean注入到我的控制器中,并为我的ui提供了一个订阅方法,如下所示:

public class VinResponseEventController {

    ConnectableFlux<ServerSentEvent<VinChangedEvent>> vinResponseKafkaEventPublisher;

    @Autowired
    public VinResponseEventController(ConnectableFlux<ServerSentEvent<VinChangedEvent>> vinResponseKafkaEventPublisher) {
        this.vinResponseKafkaEventPublisher = vinResponseKafkaEventPublisher;
        this.vinResponseKafkaEventPublisher.connect();
    }

    @CrossOrigin()
    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<VinChangedEvent> getVinEvents() {
        return vinResponseKafkaEventPublisher
                .map(record ->
                {
                    log.info("Sending data to UI = " + record.data());
                    return record.data();
                });
    }

在断开连接之前,我的angular ui会为此服务创建一个事件源大约一分钟。然后,我重试可观察对象并再次与它连接。不过,我注意到服务器并没有放弃上一次订阅,即使从浏览器的Angular 来看,我不再关注它。我想知道我是否在后端遗漏了什么?是否应该断开服务器端连接?我还注意到,当我关闭jboss服务器时,它不会耗尽所有的http请求,也不会正常关闭。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题