如何使用Spring Cloud Stream 4.x生产者和消费者检测Sping Boot 3.x,以关联记录器中的跟踪信息

0lvr5msh  于 2022-12-13  发布在  Spring
关注(0)|答案(1)|浏览(393)

在升级到 Boot 3之后,我必须更新跟踪/关联配置,以便从SpringCloudSleuth切换到新的Micrometer跟踪库。
此时,我可以在日志中看到traceId/spanId信息,这些信息使用HTTP调用和自动检测的WebClient正确地传输到其他服务。
然而,似乎Spring云流Kafka生产者和消费者没有被仪器化。
下面是一个生产者的例子:

logger.debug("Sending message to kafka queue {}", message)
streamBridge.send(bindingName, message)

具有traceId、spanId的日志:

[producer,638b36642e4fe0b203b7f32b746b22de,03b7f32b746b22de] 52233 --- [ctor-http-nio-3] i.s.m.p.p.ProjectTaskEventProducer       : Sending message to kafka queue GenericMessage [xxx]
[producer,638b36642e4fe0b203b7f32b746b22de,03b7f32b746b22de] 52233 --- [ctor-http-nio-3] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka

在消费者方面,我有这个简单的KStream:

@Bean
    fun processEvent() =
        Function<KStream<EventKey, EventValue>, KStream<EventKey, EventValue?>> { events ->
            events.process(
                ProcessorSupplier {
                    Processor<EventKey, EventValue, EventKey, EventValue> {
                        logger.info("{}", it.headers())
                    }
                }
            )
        }

记录档

[consumer,,] 52544 --- [-StreamThread-1] ventKStreamConfiguration$$SpringCGLIB$$0 : RecordHeaders(headers = [RecordHeader(key = target-protocol, value = [107, 97, 102, 107, 97]), RecordHeader(key = spring_json_header_types, value = [123, 34, 116, 97, 114, 103, 101, 116, 45, 112, 114, 111, 116, 111, 99, 111, 108, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false)

正如您所看到的,只传输了两个头(target-protocolspring_json_header_types),缺少b3头。
Micrometer文档中关于消息传递工具的内容非常少,因此不清楚如何在Spring Cloud Stream的上下文中进行消息传递。

  • StreamBridge不应该像WebClient一样自动插入指令吗?
  • 在消费者方面也是如此。
    更新1:

我已经添加了一个ProducerMessageHandlerCustomizer(如图所示),以便能够观察底层的KafkaTemplate

@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun kafkaProducerObservationCustomizer () : ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<EventKey, EventValue>> {
        return ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<EventKey, EventValue>> {
                handler, destinationName ->
            handler.kafkaTemplate.setObservationEnabled(true)
        }
    }
}

当调用StreamBridge时,执行过程在将observationEnabled属性设置为true的定制器中结束:

但是,使用者仍然只获得两个标头:

如果比较关联HTTP调用日志的ObservationRegistry

它与Kafka作品中的不同Template:

问题似乎出在KafkaTemplate中:
x1c4d 1x指令集
observationRegistry在应用程序启动时初始化,此时ProducerMessageHandlerCustomizer还没有被调用,因此observationEnabled的值将始终为false,不执行if块,默认为NOOP注册表:

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;

更新2:

我已尝试过此解决方法

@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun kafkaProducerObservationCustomizer (applicationContext: ApplicationContext, observationRegistry: ObservationRegistry) : ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<ClientProjectId, ProjectTaskEvent>> {
        return ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<ClientProjectId, ProjectTaskEvent>> {
                handler, destinationName ->
            handler.kafkaTemplate.setObservationEnabled(true)
            handler.kafkaTemplate.setApplicationContext(applicationContext)
            handler.kafkaTemplate.afterSingletonsInstantiated()
        }
    }
}

但是它不起作用。它似乎扰乱了生产者的配置,覆盖了它的值。在我的例子中,它寻找一个本地Kafka集群,而不是配置的集群:

2022-12-05T17:36:06.815+01:00  INFO [metrics-ingestor,,] 8056 --- [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2022-12-05T17:36:06.816+01:00  WARN [metrics-ingestor,,] 8056 --- [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
piv4azn7

piv4azn71#

默认情况下,底层KafkaTemplate不启用千分尺跟踪,您必须将observationEnabled设置为true
https://docs.spring.io/spring-kafka/docs/current/reference/html/#observation
借助Spring Cloud Stream,您可以通过ProducerMessageHandlerCustomizer@Bean
https://docs.spring.io/spring-cloud-stream/docs/4.0.0-M3/reference/html/spring-cloud-stream.html#_advanced_producer_configuration
处理程序类型为KafkaProducerMessageHandler;因此使用handler.getKafkaTemplate().set...来更改其属性。

相关问题