RabbitMQ超级流|Spring Cloud Stream消费者组使用RabbitMQ

qmelpv7a  于 12个月前  发布在  Spring
关注(0)|答案(1)|浏览(159)

我有2个服务(服务A和服务B)需要消耗相同的数据。我可以通过使用队列类型作为流来实现这一点。但同时,在自动伸缩期间,我不希望服务A1和服务A2(2个示例)使用相同的数据。
我发现,通过实现超级流,我可以实现这一点,但我没有找到任何好的例子来解决和使用Spring云流也是可能的,但再次无法找到正确的资源。
任何人都可以请提供我一个很好的资源或一个例子来实现这一点使用rabbitmq?

ql3eal8s

ql3eal8s1#

我不知道有任何spring-cloud-stream的例子,但是,一般来说,SCSt应用程序对底层传输是不可知的(这是它的目标之一)。
然而,Spring AMQP项目文档中有关于如何使用spring-rabbit-stream库的示例(包括普通流和超级流)。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#stream-support

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}

(You需要阅读Spring AMQP文档的其余部分才能理解什么是侦听器容器)。

相关问题