我有一个用Sping Boot 编写的后端应用程序,我必须实现将SSE发送到前端应用程序。我的后端必须支持多个订阅者,并能够发送多个事件到多个用户在前面。此外,我必须以编程方式触发事件,所以我发现它可以通过项目Reactor Sinks类来完成:
private final Map<Long, Many<MyNotification>> sinkMap = new ConcurrentHashMap<>();
public Flux<ServerSentEvent<MyNotification>> createSubscription(Long subscriptionId) {
Sinks.Many<MyNotification> sink;
if (sinkMap.get(subscriptionId) != null) {
sink = sinkMap.get(subscriptionId);
} else {
sink = Sinks.many().multicast().onBackpressureBuffer();
}
sinkMap.put(subscriptionId, sink);
return sink.asFlux().map(event -> ServerSentEvent.builder(event).build());
}
我有以下问题:
1.既然我发现应用程序本身是水平缩放的,那么在Map中保存汇是否合适?在更多应用程序并行的情况下,它将如何表现?
1.有没有一种方法可以以某种方式“存储这些Flume”(即Redis之类的),这样订阅就可以集中了?
1.是否有办法知道订阅者是否仍在订阅,以便在必要时清除Map?
1.对于在Sping Boot 中实现SSE,您是否有其他建议,不包括Reactor Sink?
1条答案
按热度按时间6ojccjat1#
首先,即使使用
ConcurrentHashMap
,您的示例也不是线程安全的,因为您分别调用了get
和put
。您应该使用computeIfAbsent
或compute
来保证Map与密钥相关的原子更新。也不清楚你计划在Map中保留Flume多久-是否会重复使用单个Flume并移交给新的消费者。每个消费者的Flume是否独一无二?至于将消息发布到集群中的多个客户端,这取决于您要做的事情。1.是也不是您可以在Map中保留参考,只要您愿意。一旦订阅者计数降至零,接收器即停止工作,必须将其删除(除非禁用autoCancel),在这种情况下,可以重新使用接收器。
1.是的。这可以通过多种方式实现。最简单的选择是忽略订阅者计数,并将接收器永远保持为单例。
1.React堆是最简单的。