为kafka服务配置侦听器时出错没有经纪人

cyvaqqii  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(320)

我试着用kafka@wurstmeister创建一个docker。
场景:我开发了一个多微服务架构。具体来说:我有一个springboot应用程序,它将json发送给我的kafka代理。 flask 服务使用数据。这在运行整个think outside docker时有效。我也能够发送数据到Kafka主题在码头。
代码: flask :

KafkaHost = "kafka:9092"
def initkafka():
    # connect to Kafka server and pass the topic we want to consume
    consumer = KafkaConsumer("TEST",
                             group_id='view',
                             bootstrap_servers=[Constants.KafkaHost]
                             )
    KafkaConsumer(auto_offset_reset='latest',
                  enable_auto_commit=False)

    KafkaConsumer(value_deserializer=lambda m: json.loads(m.dedoce('utf-8')))
    KafkaConsumer(consumer_timeout_ms=1000)
    return consumer

docker撰写:

zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      - test-net

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      #KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.17.0.1:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "TEST:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      - test-net

错误

Traceback (most recent call last):
  File "run.py", line 1, in <module>
    from controller import Controller
  File "/app/controller/Controller.py", line 27, in <module>
    consumer = KafkaConfig.initkafka()
  File "/app/config/KafkaConfig.py", line 16, in initkafka
    enable_auto_commit=False)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
    self._client = KafkaClient(metrics=self._metrics,**self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 826, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

我认为这是一个环境配置的问题。我已经阅读了wurstmeister文档,但是我不知道我需要设置什么才能让我的flask服务找到kafka代理。日志显示Kafka正在运行,主题“test”已经创建。我是否必须配置侦听器,例如,在我的网络中使用ip和端口来侦听kafka?因为在Kafka文档中,listeners被描述为
要发布到zookeeper供客户端使用的侦听器(如果与侦听器配置属性不同)。在iaas环境中,这可能需要与代理绑定到的接口不同。如果未设置,则将使用侦听器的值。与侦听器不同,播发0.0.0.0元地址是无效的。

iovurdzv

iovurdzv1#

除非我弄错了, KAFKA_ADVERTISED_LISTENERS 需要与您在flask客户端中定义的kafka主机具有相同的值。因此,如果您是从docker容器内部连接到kafka,那么您应该 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 . 如果从主机连接,则应 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 .
或者,您可以省略 KAFKA_ADVERTISED_LISTENERS 设置和定义 KAFKA_ADVERTISED_HOST_NAME: kafka .

nhjlsmyf

nhjlsmyf2#

所以我经历了@cricket\u007提到的这个。现在有点清楚了,但我还是很难搞清楚。作为对我的场景的回顾:我在同一个docker网络中运行我的所有服务和messagebroker。所以没有外部连接。在这篇博文中有一个例子:

KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB

我想我知道这个配置意味着什么。在我的情况下,我想我必须这样改变:

KAFKA_LISTENERS: LISTENER_PY://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: LISTENER_PY://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_PY:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_PY

我想Kafka国际经纪人的名字是不需要的,因为我只有一个经纪人。但是侦听器名称(listener\u py)是否依赖于我的flask服务名称或任何其他属性?据我所知,我可以使用“kafka”作为ip,因为我在docker compose中将kafka作为一个名为“kafka”的服务运行。我试过这个配置,但仍然不起作用。我想知道在我的spring服务中,作为生产者连接而不定义任何侦听器配置是如何工作的。

相关问题