我正在实现请求/应答场景的poc,以便使用kafka移动基于事件的微服务堆栈。
Spring 有两种选择。我不知道哪一个更好用。 ReplyingKafkaTemplate
或者 cloud-stream
首先是 ReplyingKafkaTemplate
它可以很容易地配置为有专门的通道来回复每个示例的主题。 record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, provider.getReplyChannelName().getBytes()));
消费者应该不需要知道回复主题的名称,只要听一个主题并用给定的回复主题返回即可。
@KafkaListener(topics = "${kafka.topic.concat-request}")
@SendTo
public ConcatReply listen(ConcatModel request) {
.....
}
第二种选择是使用 StreamListener
, spring-integration
以及 IntegrationFlows
. 应配置网关并筛选回复主题。
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = START, replyChannel = FILTER, replyTimeout = 5000, requestTimeout = 2000)
String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(START)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(Channels.INSTANCE_ID ,instanceUUID))
.channel(Channels.REQUEST)
.get();
}
@Bean
public IntegrationFlow replyFiltererFlow() {
return IntegrationFlows.from(GatewayChannels.REPLY)
.filter(Message.class, message -> Channels.INSTANCE_ID.equals(message.getHeaders().get("instanceId")) )
.channel(FILTER)
.get();
}
建筑答复
@StreamListener(Channels.REQUEST)
@SendTo(Channels.REPLY)
public Message<?> process(Message<String> request) {
必须指定回复通道。因此,接收到的回复主题将根据instanceid进行过滤,这是一种解决方法(可能会使网络膨胀)。另一方面,通过添加
consumer:
enableDlq: true
使用springcloudstreams在与rabbitmq和其他特性的互操作性方面看起来很有前途,但目前还没有正式支持请求-应答场景。问题仍然悬而未决,没有被拒绝也(https://github.com/spring-cloud/spring-cloud-stream/issues/1800)
欢迎任何建议。
1条答案
按热度按时间ss2ws0br1#
spring云流不是为请求/应答而设计的;这是可以做到的,这不是简单的,你必须写代码。
与
@KafkaListener
框架为您处理一切。如果您想让它也与rabbitmq一起工作,可以使用
@RabbitListener
也。