springcloud流kafka度量未显示在启动器度量| springboot 2.2.2 | springcloud hoxton.sr4中

jpfvwuh4  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(341)

我没有看到Kafka消费者/生产者的指标使用执行器/执行器/指标。只显示基本的jvm度量。

{
"names": [
"jvm.buffer.memory.used",
"jvm.threads.states",
"jvm.memory.committed",
"spring.integration.channels",
"process.uptime",
"jvm.memory.used",
"jvm.gc.pause",
"logback.events",
"http.server.requests",
"jvm.memory.max",
"tomcat.sessions.active.current",
"jvm.buffer.total.capacity",
"system.cpu.usage",
"jvm.threads.live",
"jvm.classes.unloaded",
"jvm.classes.loaded",
"jvm.threads.peak",
"tomcat.sessions.rejected",
"tomcat.sessions.alive.max",
"jvm.gc.memory.promoted",
"jvm.buffer.count",
"jvm.gc.memory.allocated",
"tomcat.sessions.expired",
"tomcat.sessions.created",
"jvm.gc.max.data.size",
"system.cpu.count",
"spring.integration.handlers",
"spring.integration.sources",
"process.start.time",
"jvm.threads.daemon",
"tomcat.sessions.active.max",
"jvm.gc.live.data.size",
"process.cpu.usage"
]
}

我的应用程序运行良好,绑定看起来也正常,在/actuator/bindings中可见。

[
{
"bindingName": "send-kafka-out",
"name": "test.topic",
"group": null,
"pausable": false,
"state": "running",
"extendedInfo": {
"bindingDestination": "test.topic",
"ExtendedProducerProperties": {
"autoStartup": true,
"partitionCount": 1,
"headerMode": "headers",
"extension": {
"bufferSize": 16384,
"compressionType": "none",
"sync": false,
"sendTimeoutExpression": null,
"batchTimeout": 0,
"messageKeyExpression": null,
"headerPatterns": null,
"configuration": {},
"topic": {
"replicationFactor": null,
"replicasAssignments": {},
"properties": {}
},
"useTopicHeader": false,
"recordMetadataChannel": null
},
"validPartitionSelectorProperty": true,
"validPartitionKeyProperty": true
}
},
"input": false
}
]

下面是我的流配置

public interface KafkaStreams {
    String OUTPUT = "send-kafka-out";

    @Output(OUTPUT)
    MessageChannel getOutboundMessageChannel();

}

@SpringBootApplication
@EnableDiscoveryClient
@EnableJms
@EnableBinding(KafkaStreams.class)
@ComponentScan("com.test.connector.*")
public class ConnectorApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConnectorApplication.class, args);
    }

}

下面是我的config.yml

spring:
  jms:
    listener:
      auto-startup: true
      acknowledge-mode: AUTO
      max-concurrency: 10
      concurrency: 10
  cloud:
    stream:
      default-binder: kafka-default
      function:
        definition: 
      kafka:
        binder:
          brokers:
            - localhost:9092
          configuration:
            default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            default.value.serde: org.apache.kafka.common.serialization.Serdes$BytesSerde
            commit.interval.ms: 1000
            #security:
            #  protocol: SASL_PLAINTEXT

      bindings:
        send-kafka-out:
          destination: test.topic
          contentType: text/plain
          group: output-group-1
          binder: kafka-default
          producer:
            header-mode: headers

      binders:
        kafka-default:
         type: kafka
         environment:
           spring:
             cloud:
               stream:
                kafka:
                  binder:
                    brokers: localhost:9092

我使用的是springboot2.2.2.release、springcloud hoxton.sr4和springcloud stream binder kafka 3.0.4.release。任何关于我缺少什么的建议都会有所帮助。

w3nuxt5m

w3nuxt5m1#

你需要有输入绑定。Kafka活页夹指标只支持消费者。看起来代码中只有一个输出绑定。我刚刚尝试了一个快速应用程序,里面有一个消费者,我能够检索到指标:
/actuator/metrics/spring.cloud.stream.binder.kafka.offset {"name":"spring.cloud.stream.binder.kafka.offset","description":"Unconsumed messages for a particular group and topic","baseUnit":null,"measurements":[{"statistic":"VALUE","value":0.0}],"availableTags":[{"tag":"topic","values":["uppercase-in-0"]},{"tag":"group","values":["anonymous.06730d87-0871-4b82-bd82-ae4958dbe2a3"]}]}

相关问题