我有一个发布者发出字符串,而许多订阅者可能使用相同的Map函数来创建具有不同过滤器的模型。
出版商:
val publisher: Flux<String> = ...
订户#1
val sub1 = publisher.map{veryExpensiveConverter.convert(it)}
.filter(it.metric<10)
订户#2
val sub2 = publisher.map{veryExpensiveConverter.convert(it)}
.filter(it.metric>5)
订户#3
val sub3 = sub2.map{cheapConverter.convert(it)}
.filter(it.metric>8)
订户#4
val sub4 = sub3.map{yetAnotherConverter.convert(it)}
.filter(it.metric>80)
最后我订阅了所有的流量
Flux.merge(sub1, sub2, sub3, ..., subn)
.map{//some logic for following data of subscribers}
.subscribe()
问题是:veryexpensiveconverter对于每个订阅服务器的同一个已发布记录执行多次。执行流看起来
Input1 -> veryExpensiveConverter -> filter1 -> output1
-> veryExpensiveConverter -> filter2 -> output2
-> veryExpensiveConverter -> cheapConverter -> filter3 -> output3
我也想要
Input1 -> veryExpensiveConverter -> filter1 -> output1
-> filter2 -> output2
-> cheapConverter -> filter3 -> output3
什么样的模式最适合避免对每个订户执行相同的Map?
1条答案
按热度按时间b5buobof1#
你可以
.share()
在某种程度上,确保对该共享部分的每个订阅只触发其上的单个订阅。你也可以看看
.publish().()
更高级的自动触发器的方法(.share()
将在第一次订阅后立即启动其源代码)。像这样: