我从localhost连接到Kafka时遇到问题。我在没有zookeper的情况下使用KRaft和Kafka。
services:
kafka_auth:
image: confluentinc/cp-kafka:7.2.1
environment:
KAFKA_NODE_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka_auth:9092,CONTROLLER://kafka_auth:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka_auth:9092,EXTERNAL://127.0.0.1:9094
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka_mqtt:9093,2@kafka_auth:9093,3@kafka_server:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./run_workaround.sh:/tmp/run_workaround.sh
command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
depends_on:
- kafka_mqtt
ports:
- "9094:9094"
api:
image: node:18
volumes:
- ./:/app
command: sh -c "yarn install && yarn start:debug"
depends_on:
- kafka_mqtt
字符串
当我从API服务连接时,它连接没有任何错误,但是当我尝试在端口9094上使用相同的代码连接Docker外部时,我得到一个错误:
{"level":"ERROR","timestamp":"2023-08-06T01:16:06.815Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":27,"retryTime":10000}
型
run_workaround.sh
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
echo "kafka-storage format --ignore-formatted -t CLUSTER_ID -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
型
NodeJS
import {
Kafka,
Consumer,
} from 'kafkajs';
const kafka = new Kafka({
clientId: 'clientId',
brokers: process.env.KAFKA_BOOTSTRAP_SERVERS.split(','),
requestTimeout: 3600000,
retry: {
maxRetryTime: 10000,
initialRetryTime: 10000,
retries: 999999999,
},
});
const consumer = kafka.consumer({ groupId: 'groupId' });
consumer.connect().then(async () => {
// here I also check all topics and create them if they don't exist
await consumer.subscribe({ topics: ['topic1'] });
await consumer.run({
eachMessage: async ({ topic, message }) => {
if (!message?.value) {
return;
}
switch (topic) {
case 'topic1':
method(message.value.toString());
break;
default:
break;
}
},
});
型
1条答案
按热度按时间6xfqseft1#
我通过将所有Kafka服务添加到同一个网络和一个额外的主机host.docker.internal来修复它。
字符串