version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- ./.docker-volumes/zookeeper/_data:/var/lib/zookeeper/data
- ./.docker-volumes/zookeeper/_log:/var/lib/zookeeper/log
kafka:
image: confluentinc/cp-kafka:5.5.1
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./.docker-volumes/kafka/_data:/var/lib/kafka/data
字符串
上面是docker-compose文件,用于在docker中启动zookeeper和Kafka示例。
当我运行一个试图与Kafka建立连接的服务时。我得到了低于误差。我在互联网上找到了它,但没有得到太多。
服务层出错
.12:43:18.730 [kafka-producer-network-thread | producer-2] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] Connection to node -1 (localhost/127.0.0.1:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
.12:43:19.295 [kafka-producer-network-thread | producer-1] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
.12:43:19.481 [kafka-admin-client-thread | vendor_srs_consumer_srs_restaurant_finance_pan_validate_client_id_0] WARN o.apache.kafka.clients.NetworkClient - [AdminClient clientId=vendor_srs_consumer_srs_restaurant_finance_pan_validate_client_id_0] Connection to node -1 (localhost/127.0.0.1:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
型
kafka-producer尝试连接kafka brothers时Docker出错。
[2021-07-12 07:13:19,492] WARN [SocketServer brokerId=1] Unexpected error from /172.20.0.1; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296129 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:893)
at kafka.network.Processor.run(SocketServer.scala:792)
at java.lang.Thread.run(Thread.java:748)
型
不知道为什么生产者无法与经纪人连接。先谢谢你。
3条答案
按热度按时间vaj7vani1#
PLAINTEXT_HOST://0.0.0.0:9092
不是有效的通告侦听器;这需要是可解析的地址。在您的错误中,
Connection to node -1 (localhost/127.0.0.1:9092) terminated
,不清楚此代码在哪里运行,但如果在Docker之外的同一主机上,则应通告localhost
,而不是0.0.0.0
您的另一个错误
InvalidReceiveException: Invalid receive (size = 369296129 larger than 104857600)
表明您连接到了其他服务器,并得到了客户端不期望的响应。7d7tgy0s2#
最初,它们在默认的网桥网络上,为了在容器之间进行通信,您必须使用容器IP。
您可以尝试在同一网络上部署两个容器。
在docker-compose中添加网络定义
字符串
在服务定义中,您可以添加网络名称和端口。
例如,对于zookeeper服务
型
然后你可以通过zookeeper-kafka:2181(即zookeeper-kafka)访问zookeeper服务。network-name:port),同时通过Kafka进行连接。
更新Kafka服务定义示例
型
希望这对你有帮助
6pp0gazn3#
我也遇到了同样的问题,我们使用Kafka容器来运行处理生产者或消费者的功能测试。
下面是来自docker-compose.yml的片段,其中提供了zookeeper、Kafka和schema注册表的容器
字符串
现在你可以在Kafka容器中看到
型
容器暴露的安全协议是PLAINTEXT。
但是在我的例子中,问题出在application.yaml文件中。下面是application. yaml中的Kafka生产者配置。现在作为其提到的安全协议是SASL_SSL
型
因此,为了运行我的功能测试,我必须将此安全协议覆盖到PLAINTEXT。因为这只需要进行功能测试,并且在生产配置中,SASL_SSL是正确的。
我们可以通过设置一个名为
SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL
的环境变量来覆盖它,这相当于(Spring隐式地解决了这个问题)型
因此,在功能测试模式下,应用程序能够通过覆盖从SASL_SSL到PLAINTEXT的安全协议连接到Kafka容器