Spring Integration中的跟踪(Sping Boot 3.0.9)

rqdpfwrv  于 2023-09-29  发布在  Spring
关注(0)|答案(1)|浏览(136)

我正在尝试使用Spring集成:我通过JMS接收消息并将其发送给Kafka:

@Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(mqInQueue))
                .log(LoggingHandler.Level.DEBUG, message -> "Received JMS message: " + message.getPayload())
                .channel(channels -> MessageChannelFactory.create(channels, "request-channel-1"))
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(kafkaOutTopic));
    }

对于检查,我使用POST请求+ JmsTemplate向MQ发送消息:

@PostMapping("/mq")
    public String sentToMq(@RequestBody final String body) {
        jmsTemplate.convertAndSend(mqRequestQueue, body, m -> {
            final var span = tracer.startScopedSpan("jms-send");
            try {
                final var context = span.context();
                m.setStringProperty("b3", "%s-%s-%s".formatted(context.traceId(), context.spanId(), Boolean.TRUE.equals(context.sampled()) ? "1" : "0"));
            } finally {
                span.end();
            }

            return m;
        });
        return "done";
    }

一切正常,除了跟踪。我在发送前手动设置了b3头,但在接收到Spring Integration后覆盖了它。我应该做什么来保持传入的traceId?
我还有下一个配置:

  • @EnableIntegrationManagement(observationPatterns = "*")
@Bean
  @GlobalChannelInterceptor(order = Ordered.HIGHEST_PRECEDENCE)
  public ChannelInterceptor observationPropagationChannelInterceptor(final ObservationRegistry observationRegistry) {
      return new ObservationPropagationChannelInterceptor(observationRegistry);
  }
kzmpq1sx

kzmpq1sx1#

好的。我知道问题出在哪里了。JmsMessageDrivenEndpoint不会将其registerObservationRegistry(ObservationRegistry)向下传播到ChannelPublishingJmsMessageListener,因此IntegrationObservation.HANDLER不会从该b3标头恢复跟踪。
MessageChannel的观察仅参考当前上下文,并根据当前上下文实际填充新的b3头部。
考虑使用Jms.messageDrivenChannelAdapter(AbstractMessageListenerContainer),并使用AbstractMessageListenerContainer.setObservationRegistry()来启用其JmsObservationDocumentation.JMS_MESSAGE_PROCESS使用者跟踪。
对于JmsMessageDrivenEndpoint,请在Spring Integration中提出GH问题。

相关问题