应答场景

rmbxnbpk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(428)

我正在实现请求/应答场景的poc,以便使用kafka移动基于事件的微服务堆栈。
Spring 有两种选择。我不知道哪一个更好用。 ReplyingKafkaTemplate 或者 cloud-stream 首先是 ReplyingKafkaTemplate 它可以很容易地配置为有专门的通道来回复每个示例的主题。 record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, provider.getReplyChannelName().getBytes())); 消费者应该不需要知道回复主题的名称,只要听一个主题并用给定的回复主题返回即可。

@KafkaListener(topics = "${kafka.topic.concat-request}")
@SendTo
public ConcatReply listen(ConcatModel request) {
       .....
}

第二种选择是使用 StreamListener , spring-integration 以及 IntegrationFlows . 应配置网关并筛选回复主题。

@MessagingGateway
public interface StreamGateway {
    @Gateway(requestChannel = START, replyChannel = FILTER, replyTimeout = 5000, requestTimeout = 2000)
    String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() { 
    return IntegrationFlows.from(START)
            .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
            .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(Channels.INSTANCE_ID ,instanceUUID)) 
            .channel(Channels.REQUEST)
            .get();
}
@Bean
public IntegrationFlow replyFiltererFlow() {
    return IntegrationFlows.from(GatewayChannels.REPLY)
            .filter(Message.class, message -> Channels.INSTANCE_ID.equals(message.getHeaders().get("instanceId")) )
            .channel(FILTER)
            .get();
}

建筑答复

@StreamListener(Channels.REQUEST)
@SendTo(Channels.REPLY)
public Message<?> process(Message<String> request) {

必须指定回复通道。因此,接收到的回复主题将根据instanceid进行过滤,这是一种解决方法(可能会使网络膨胀)。另一方面,通过添加

consumer:
                enableDlq: true

使用springcloudstreams在与rabbitmq和其他特性的互操作性方面看起来很有前途,但目前还没有正式支持请求-应答场景。问题仍然悬而未决,没有被拒绝也(https://github.com/spring-cloud/spring-cloud-stream/issues/1800)
欢迎任何建议。

ss2ws0br

ss2ws0br1#

spring云流不是为请求/应答而设计的;这是可以做到的,这不是简单的,你必须写代码。
@KafkaListener 框架为您处理一切。
如果您想让它也与rabbitmq一起工作,可以使用 @RabbitListener 也。

相关问题