使用Kafka spring Boot 3和opentelemetry的分布式跟踪不工作

jhkqcmku  于 2022-12-23  发布在  Spring
关注(0)|答案(2)|浏览(175)

我想尝试在spring Boot 3 Kafka生产者和消费者之间进行跟踪,我遵循了https://spring.io/blog/2022/10/12/observability-with-spring-boot-3中的示例。当我在生产者和消费者之间使用resttemplate进行API调用时,会自动添加traceID,但Kafka消息不携带它们。

<dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-otel</artifactId>
        </dependency>

        <dependency>
            <groupId>io.opentelemetry</groupId>
            <artifactId>opentelemetry-exporter-zipkin</artifactId>
        </dependency>
     <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.0.0</version>
        </dependency>

Kafka生产者结构

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    //typical properties 
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
    KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory);
    stringStringKafkaTemplate.setObservationEnabled(true);//trying out with
    return stringStringKafkaTemplate;
}

正在发送消息

kafkaTemplate.send("topic-1" , "message");

在消费者方面:

@KafkaListener(topics = "topic-1", groupId = "group1")
    public void listenGroupFoo(String message) {
        logger.info("Received Message in group foo: " + message);
    }

当我检查Kafka消息头时,每条消息都有traceparent头,但在使用者日志中看不到traceId和spanId

bxpogfeg

bxpogfeg1#

Spring for Apache Kafka默认情况下不启用跟踪;见
https://docs.spring.io/spring-kafka/docs/current/reference/html/#observation

    • 千分尺观察**

从版本3.0开始,KafkaTemplate和侦听器容器现在支持使用千分尺进行观察。
在每个组件上设置observationEnabled,使能观察;这将禁用测微计计时器,因为计时器现在将通过每次观察进行管理。

wvmv3b1j

wvmv3b1j2#

我能够使它的工作,使观察消费者工厂以及.更改

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

         //The following piece of code did the trick    
        factory.getContainerProperties().setObservationEnabled(true);
        factory.setConsumerFactory(consumerFactory);

        return factory;
    }

相关问题