kafka使用者不使用来自现有主题的消息

cgvd09ve  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(327)

我在docker上安装了confluent kafka。在这个主题中,我有10个分区。问题是我不能使用来自该主题的消息,但我可以在该主题中生成消息。我正在尝试使用c#confluent.kafka driver 1.5.1(最新版本)和librd.kafka 1.5.0(最新版本)来使用本主题中的内容。
我开始使用的docker compose文件如下

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    networks:
      - bridge_network
    ports:
      - "3001:3001"    
    environment:
      ZOOKEEPER_CLIENT_PORT: 3001
      ZOOKEEPER_TICK_TIME: 3000

  broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

  kafka_manager:
    image: sheepkiller/kafka-manager
    hostname: kafka_manager
    depends_on:
      - zookeeper
    ports:
      - '9000:9000'
    networks:
      - bridge_network
    environment:
      ZK_HOSTS: 'zookeeper:3001'
networks:
  bridge_network:
    driver: bridge
    driver_opts:
      com.docker.network.enable_ipv6: "false"

我在c#中的使用者配置如下:

var consumer = new ConsumerBuilder<string, string>(new Dictionary<string, string>
            {
                { "bootstrap.servers", "PLAINTEXT://localhost:3002" },
                { "group.id", "some-test-group" },
                { "auto.offset.reset", "latest"},
                { "compression.codec", "gzip" },
                { "enable.auto.commit", "false" }
            }).Build();

            consumer.Subscribe("some-test-topic");

            while (true)
            {
                var cr = consumer.Consume(30_000);
                if (cr == null || cr.Message.Key == null || cr.Message.Value == null)
                {
                    System.Console.WriteLine("that's it");
                    break;
                }

                System.Console.WriteLine(cr.Message.Key + ": " + cr.Message.Value);
            }

我确信主题的分区中有消息,因为我可以使用kafka工具2.0检查主题

我用于kafka工具的配置是


我很确定我在配置文件中漏掉了一些东西,但是在阅读了两天的文档后,我仍然找不到这个问题。有人能帮忙吗?

sirbozc5

sirbozc51#

问题在于代理和主题复制因素。我使用docker compose文件部署kafka,我连接查看日志并显示以下消息:

ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

为了解决这个问题,我必须为代理配置添加“kafka\u offsets\u topic\u replication\u factor:1”。因此,我的代理服务配置如下所示:

broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

重新启动代理后,我能够生成/使用消息。

chhqkbe1

chhqkbe12#

你需要设置 auto.offset.reset 在用户运行时“最早”或生成主题消息。

相关问题