我试着用kafka@wurstmeister创建一个docker。
场景:我开发了一个多微服务架构。具体来说:我有一个springboot应用程序,它将json发送给我的kafka代理。 flask 服务使用数据。这在运行整个think outside docker时有效。我也能够发送数据到Kafka主题在码头。
代码: flask :
KafkaHost = "kafka:9092"
def initkafka():
# connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer("TEST",
group_id='view',
bootstrap_servers=[Constants.KafkaHost]
)
KafkaConsumer(auto_offset_reset='latest',
enable_auto_commit=False)
KafkaConsumer(value_deserializer=lambda m: json.loads(m.dedoce('utf-8')))
KafkaConsumer(consumer_timeout_ms=1000)
return consumer
docker撰写:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- test-net
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
#KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.17.0.1:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "TEST:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
networks:
- test-net
错误
Traceback (most recent call last):
File "run.py", line 1, in <module>
from controller import Controller
File "/app/controller/Controller.py", line 27, in <module>
consumer = KafkaConfig.initkafka()
File "/app/config/KafkaConfig.py", line 16, in initkafka
enable_auto_commit=False)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
self._client = KafkaClient(metrics=self._metrics,**self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 826, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
我认为这是一个环境配置的问题。我已经阅读了wurstmeister文档,但是我不知道我需要设置什么才能让我的flask服务找到kafka代理。日志显示Kafka正在运行,主题“test”已经创建。我是否必须配置侦听器,例如,在我的网络中使用ip和端口来侦听kafka?因为在Kafka文档中,listeners被描述为
要发布到zookeeper供客户端使用的侦听器(如果与侦听器配置属性不同)。在iaas环境中,这可能需要与代理绑定到的接口不同。如果未设置,则将使用侦听器的值。与侦听器不同,播发0.0.0.0元地址是无效的。
2条答案
按热度按时间iovurdzv1#
除非我弄错了,
KAFKA_ADVERTISED_LISTENERS
需要与您在flask客户端中定义的kafka主机具有相同的值。因此,如果您是从docker容器内部连接到kafka,那么您应该KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
. 如果从主机连接,则应KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
.或者,您可以省略
KAFKA_ADVERTISED_LISTENERS
设置和定义KAFKA_ADVERTISED_HOST_NAME: kafka
.nhjlsmyf2#
所以我经历了@cricket\u007提到的这个。现在有点清楚了,但我还是很难搞清楚。作为对我的场景的回顾:我在同一个docker网络中运行我的所有服务和messagebroker。所以没有外部连接。在这篇博文中有一个例子:
我想我知道这个配置意味着什么。在我的情况下,我想我必须这样改变:
我想Kafka国际经纪人的名字是不需要的,因为我只有一个经纪人。但是侦听器名称(listener\u py)是否依赖于我的flask服务名称或任何其他属性?据我所知,我可以使用“kafka”作为ip,因为我在docker compose中将kafka作为一个名为“kafka”的服务运行。我试过这个配置,但仍然不起作用。我想知道在我的spring服务中,作为生产者连接而不定义任何侦听器配置是如何工作的。