如何手动分析来自kafka消息主题的标题?getting confluentinc\u kafkacat\u 1已退出,代码为2

tkqqtvp1  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(417)

目标:我想分析一个主题的标题,我正在寻找一种简单的方法来查看标题。所以我不想仅仅为此开发一个额外的应用程序或扩展代码。任何用于查看标题的straighforward工具都是有用的。
从这个问题上我读到,Kafka卡特是有可能的

kafkacat -b kafka-broker:9092 -t my_topic_name -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

因此,我希望开始Kafka卡特作为我的 Docker 组成的一部分,我的第一次尝试是

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-tools:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka
    container_name: kafka
    command: ["tail", "-f", "/dev/null"]
    network_mode: "host"

  kafkacat:
    image: confluentinc/cp-kafkacat
    command: 
      - bash 
      - -c 
    links:
      - broker

我得到了

broker         | [2020-12-11 10:46:13,084] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
broker         | [2020-12-11 10:46:13,084] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
confluentinc_kafkacat_1 exited with code 2
broker         | [2020-12-11 10:51:13,085] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)

Kafka卡特教程的第二个初步基础

C:\Users\DEMETRC>docker run --tty confluentinc/cp-kafkacat kafkacat -b localhost:9092 -L
% ERROR: Failed to acquire metadata: Local: Broker transport failure

附言:我的 Docker Kafka在localhost:9092 without docker网络。在教程的示例中,Kafka在kafka:29092 on docker网络docker-U默认值
你知道怎么把Kafka卡特加到我的 Docker 作品里吗?有其他建议来调查邮件头吗?

***罗宾·莫法特回答后的第一次编辑

我编辑了docker的文章,加入了robin的建议

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-server:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-tools:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka
    container_name: kafka
    command: ["tail", "-f", "/dev/null"]
    network_mode: "host"

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    links:
      - broker
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

这是我的主题描述


# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demotopic

Created topic demotopic.

# kafka-topics --describe --bootstrap-server localhost:9092 --topic demotopic

Topic: demotopic        PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: demotopic        Partition: 0    Leader: 1       Replicas: 1     Isr: 1

# 

我在使用Kafka卡特的时候犯了一个错误

/ # kafkacat -b localhost:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p
 Offset: %o Headers: %h\n'
%3|1607709622.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1607709623.155|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
% ERROR: Failed to query metadata for topic demotopic: Local: Broker transport failure
/ #

***第二次编辑

现在我可以使用kafkacat列出所有的主题,但是我仍然无法获取消息头

/ #  kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
 1 brokers:
  broker 1 at localhost:9092 (controller)
 4 topics:
  topic "demotopic" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-license" with 1 partitions:
/ # kafkacat -L -b broker:9092
Metadata for all topics (from broker -1: broker:9092/bootstrap):
 1 brokers:
  broker 1 at localhost:9092 (controller)
 4 topics:
  topic "demotopic" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-license" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
  topic "_confluent-metrics" with 12 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
    partition 1, leader 1, replicas: 1, isrs: 1
    partition 2, leader 1, replicas: 1, isrs: 1
    partition 3, leader 1, replicas: 1, isrs: 1
    partition 4, leader 1, replicas: 1, isrs: 1
    partition 5, leader 1, replicas: 1, isrs: 1
    partition 6, leader 1, replicas: 1, isrs: 1
    partition 7, leader 1, replicas: 1, isrs: 1
    partition 8, leader 1, replicas: 1, isrs: 1
    partition 9, leader 1, replicas: 1, isrs: 1
    partition 10, leader 1, replicas: 1, isrs: 1
    partition 11, leader 1, replicas: 1, isrs: 1
  topic "__confluent.support.metrics" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
/ # kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Of
fset: %o Headers: %h\n'
%3|1607719862.133|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/1]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
% ERROR: Local: Broker transport failure: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)

Kafka卡特的剧本有什么问题吗?

kafkacat -b broker:9092 -t demotopic -C -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
btqmn9zl

btqmn9zl1#

confluentinc_kafkacat_1 exited with code 2 是因为容器退出了,因为您重写了启动命令来启动bash;然后就退出了

command: 
      - bash 
      - -c

这个文件显示了一个您需要的工作示例,其中容器将保持运行

entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

aydin的观点是,docker中不需要kafkacat—您可以在本地完成,但我发现在docker compose中包含它更容易,这样您就不依赖于本地安装。
如果您想在docker之外运行kafkacat,请编写但仍在docker中运行 docker run 你可以,但要记住网络的含义。如果你想用 localhost 那么这是相对于容器本身的,即kafkacat运行的地方。相反,您需要使kafka代理可以访问kafkacat容器,例如,通过添加到主机网络:

docker run --network host --interactive --rm edenhill/kafkacat:1.6.0 -b localhost:9092 -L

相关问题