如何从localhost连接docker-compose Kafka?

mklgxw1f  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(128)

我从localhost连接到Kafka时遇到问题。我在没有zookeper的情况下使用KRaft和Kafka。

services:
  kafka_auth:
    image: confluentinc/cp-kafka:7.2.1
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://kafka_auth:9092,CONTROLLER://kafka_auth:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka_auth:9092,EXTERNAL://127.0.0.1:9094
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka_mqtt:9093,2@kafka_auth:9093,3@kafka_server:9093'
      KAFKA_PROCESS_ROLES: 'broker,controller'
    volumes:
      - ./run_workaround.sh:/tmp/run_workaround.sh
    command: "bash -c '/tmp/run_workaround.sh && /etc/confluent/docker/run'"
    depends_on:
      - kafka_mqtt
    ports:
      - "9094:9094"

  api:
    image: node:18
    volumes:
      - ./:/app
    command: sh -c "yarn install && yarn start:debug"
    depends_on:
      - kafka_mqtt

字符串
当我从API服务连接时,它连接没有任何错误,但是当我尝试在端口9094上使用相同的代码连接Docker外部时,我得到一个错误:

{"level":"ERROR","timestamp":"2023-08-06T01:16:06.815Z","logger":"kafkajs","message":"[BrokerPool] Closed connection","retryCount":27,"retryTime":10000}


run_workaround.sh

sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
echo "kafka-storage format --ignore-formatted -t CLUSTER_ID -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure


NodeJS

import {
  Kafka,
  Consumer,
} from 'kafkajs';
const kafka = new Kafka({
    clientId: 'clientId',
    brokers: process.env.KAFKA_BOOTSTRAP_SERVERS.split(','),
    requestTimeout: 3600000,
    retry: {
      maxRetryTime: 10000,
      initialRetryTime: 10000,
      retries: 999999999,
    },
});

const consumer = kafka.consumer({ groupId: 'groupId' });
consumer.connect().then(async () => {
  // here I also check all topics and create them if they don't exist
  await consumer.subscribe({ topics: ['topic1'] });
  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      if (!message?.value) {
        return;
      }
      switch (topic) {
        case 'topic1':
          method(message.value.toString());
          break;
        default:
          break;
      }
    },
  });

6xfqseft

6xfqseft1#

我通过将所有Kafka服务添加到同一个网络和一个额外的主机host.docker.internal来修复它。

services:
  kafka_auth:
    image: confluentinc/cp-kafka:7.2.1
    container_name: kafka_auth
    hostname: kafka_auth
    restart: always
    environment:
      # ...
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,LISTENER_LOCALHOST:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_LOCALHOST://kafka_auth:9192,LISTENER_DOCKER://kafka_auth:9194'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_LISTENERS: 'CONTROLLER://kafka_auth:9093,LISTENER_LOCALHOST://kafka_auth:9192,LISTENER_DOCKER://kafka_auth:9194'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_LOCALHOST'
    networks:
      - kafka_net
    extra_hosts:
      - "host.docker.internal:host-gateway"
 networks:
  kafka_net:
    name: kafka_net

字符串

相关问题