如何避免在sping reactor中对类似订户重复Map操作?

kq0g1dla  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(272)

我有一个发布者发出字符串,而许多订阅者可能使用相同的Map函数来创建具有不同过滤器的模型。
出版商:

val publisher: Flux<String> = ...

订户#1

val sub1 = publisher.map{veryExpensiveConverter.convert(it)}
                    .filter(it.metric<10)

订户#2

val sub2 = publisher.map{veryExpensiveConverter.convert(it)}
                    .filter(it.metric>5)

订户#3

val sub3 = sub2.map{cheapConverter.convert(it)}
                    .filter(it.metric>8)

订户#4

val sub4 = sub3.map{yetAnotherConverter.convert(it)}
                    .filter(it.metric>80)

最后我订阅了所有的流量

Flux.merge(sub1, sub2, sub3, ..., subn)
     .map{//some logic for following data of subscribers}
     .subscribe()

问题是:veryexpensiveconverter对于每个订阅服务器的同一个已发布记录执行多次。执行流看起来

Input1 -> veryExpensiveConverter -> filter1 -> output1
       -> veryExpensiveConverter -> filter2 -> output2
       -> veryExpensiveConverter -> cheapConverter -> filter3 -> output3

我也想要

Input1 -> veryExpensiveConverter -> filter1 -> output1  
                                 -> filter2 -> output2
                                 -> cheapConverter -> filter3 -> output3

什么样的模式最适合避免对每个订户执行相同的Map?

b5buobof

b5buobof1#

你可以 .share() 在某种程度上,确保对该共享部分的每个订阅只触发其上的单个订阅。
你也可以看看 .publish().() 更高级的自动触发器的方法( .share() 将在第一次订阅后立即启动其源代码)。
像这样:

val expensiveDoneOnce = publisher
    .map{veryExpensiveConverter.convert(it)}
    .publish()
    .refCount(2)
val sub1 = expensiveDoneOnce.filter(it.metric < 10)
val sub2 = expensiveDoneOnce.filter(it.metric > 5)

相关问题