我正在尝试使用Spring集成:我通过JMS接收消息并将其发送给Kafka:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(mqInQueue))
.log(LoggingHandler.Level.DEBUG, message -> "Received JMS message: " + message.getPayload())
.channel(channels -> MessageChannelFactory.create(channels, "request-channel-1"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(kafkaOutTopic));
}
对于检查,我使用POST请求+ JmsTemplate向MQ发送消息:
@PostMapping("/mq")
public String sentToMq(@RequestBody final String body) {
jmsTemplate.convertAndSend(mqRequestQueue, body, m -> {
final var span = tracer.startScopedSpan("jms-send");
try {
final var context = span.context();
m.setStringProperty("b3", "%s-%s-%s".formatted(context.traceId(), context.spanId(), Boolean.TRUE.equals(context.sampled()) ? "1" : "0"));
} finally {
span.end();
}
return m;
});
return "done";
}
一切正常,除了跟踪。我在发送前手动设置了b3
头,但在接收到Spring Integration后覆盖了它。我应该做什么来保持传入的traceId?
我还有下一个配置:
@EnableIntegrationManagement(observationPatterns = "*")
@Bean
@GlobalChannelInterceptor(order = Ordered.HIGHEST_PRECEDENCE)
public ChannelInterceptor observationPropagationChannelInterceptor(final ObservationRegistry observationRegistry) {
return new ObservationPropagationChannelInterceptor(observationRegistry);
}
1条答案
按热度按时间kzmpq1sx1#
好的。我知道问题出在哪里了。
JmsMessageDrivenEndpoint
不会将其registerObservationRegistry(ObservationRegistry)
向下传播到ChannelPublishingJmsMessageListener
,因此IntegrationObservation.HANDLER
不会从该b3
标头恢复跟踪。对
MessageChannel
的观察仅参考当前上下文,并根据当前上下文实际填充新的b3
头部。考虑使用
Jms.messageDrivenChannelAdapter(AbstractMessageListenerContainer)
,并使用AbstractMessageListenerContainer.setObservationRegistry()
来启用其JmsObservationDocumentation.JMS_MESSAGE_PROCESS
使用者跟踪。对于
JmsMessageDrivenEndpoint
,请在Spring Integration中提出GH问题。