我的ui在angular9上,而我的后端使用spring-boot、webflux、reactor-core和kafka。我的想法是,当后端从kafka主题接收到事件时,我希望在ui中显示这些事件。我创造了这样一个可连接的通量:
@Bean
ConnectableFlux<ServerSentEvent<VinChangedEvent>> vinResponseKafkaEventPublisher(ReceiverOptions<String, VinChangedEvent> vinResponseKafkaReceiverOptions) {
return KafkaReceiver.create(vinResponseKafkaReceiverOptions)
.receive()
.map(vinChangedEventReceiverRecord -> ServerSentEvent.builder(vinChangedEventReceiverRecord.value()).build())
.publish();
}
我将这个bean注入到我的控制器中,并为我的ui提供了一个订阅方法,如下所示:
public class VinResponseEventController {
ConnectableFlux<ServerSentEvent<VinChangedEvent>> vinResponseKafkaEventPublisher;
@Autowired
public VinResponseEventController(ConnectableFlux<ServerSentEvent<VinChangedEvent>> vinResponseKafkaEventPublisher) {
this.vinResponseKafkaEventPublisher = vinResponseKafkaEventPublisher;
this.vinResponseKafkaEventPublisher.connect();
}
@CrossOrigin()
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<VinChangedEvent> getVinEvents() {
return vinResponseKafkaEventPublisher
.map(record ->
{
log.info("Sending data to UI = " + record.data());
return record.data();
});
}
在断开连接之前,我的angular ui会为此服务创建一个事件源大约一分钟。然后,我重试可观察对象并再次与它连接。不过,我注意到服务器并没有放弃上一次订阅,即使从浏览器的Angular 来看,我不再关注它。我想知道我是否在后端遗漏了什么?是否应该断开服务器端连接?我还注意到,当我关闭jboss服务器时,它不会耗尽所有的http请求,也不会正常关闭。
暂无答案!
目前还没有任何答案,快来回答吧!