连接到docker中运行的kafka

guykilcj  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(534)

我在本地机器上设置了一个单节点kafka docker容器,就像合流文档中描述的那样(步骤2-3)。
此外,我还公开了zookeeper的端口2181和kafka的端口9092,以便能够从本地计算机上运行的客户端连接到它们:

$ docker run -d \
    -p 2181:2181 \
    --net=confluent \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    confluentinc/cp-zookeeper:4.1.0

$ docker run -d \
    --net=confluent \
    --name=kafka \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:4.1.0

问题:当我尝试从主机连接到kafka时,连接失败,因为 can't resolve address: kafka:9092 .
以下是我的java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();

例外情况:

java.io.IOException: Can't resolve address: kafka:9092
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-2.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-2.0.0.jar:na]
    ... 7 common frames omitted

问:如何连接Kafka在码头运行?我的代码是从主机上运行的,不是docker。
注意:我知道理论上我可以玩dns设置和 /etc/hosts 但这是一个解决办法-不应该是这样的。
这里也有类似的问题,但它是基于 ches/kafka 形象。我用 confluentinc 基于不同的图像。

but5z9lq

but5z9lq1#

当您第一次连接到Kafka节点时,它将返回所有Kafka节点和要连接的url。然后您的应用程序将尝试直接连接到每个Kafka。
问题总是什么是Kafka会给你的网址?这就是为什么会有 KAFKA_ADVERTISED_LISTENERS Kafka将用它来告诉全世界如何访问它。
现在对于您的用例,有许多小问题需要考虑:
假设你设定了 plaintext://kafka:9092 如果docker compose中有一个使用kafka的应用程序,这是可以的。此应用程序将从kafka获取带有 kafka 可通过docker网络解决。
如果尝试从主系统或不在同一docker网络中的其他容器进行连接,则会失败,因为 kafka 无法解析名称。
==>要解决这个问题,你需要有一个特定的dns服务器,比如服务发现服务器,但是对于小东西来说这是个大麻烦。或者手动设置 kafka 每个容器中容器ip的名称 /etc/hosts 如果你设置 plaintext://localhost:9092 如果您有端口Map(在启动kafka时为-p9092:9092),这在您的系统上就可以了
如果从容器(是否相同docker网络)上的应用程序进行测试(localhost是容器本身,而不是kafka容器),则会失败
==>如果您有此问题,并且希望在另一个容器中使用kafka客户端,解决此问题的一种方法是共享两个容器的网络(相同的ip)
最后一个选项:在名称中设置ip: plaintext://x.y.z.a:9092 这对每个人都没问题。。。但是你怎么能得到x.y.z.a的名字呢?
唯一的方法是在启动容器时对该ip进行硬编码: docker run .... --net confluent --ip 10.x.y.z ... . 请注意,您需要将ip适配到中的一个有效ip confluent 子网。

9avjhtql

9avjhtql2#

免责声明

热释光;博士-归根结底,这都是相同的ApacheKafka运行在一个容器。你只需要看它是如何配置的。又是哪些变量使之如此。

以下使用confluentinc docker映像,而不是 wurstmeister/kafka ,虽然有类似的配置,但我没有尝试过。如果使用该图像,请阅读他们的连接wiki。

没有违反法律的 wurstmeister 图片,但它是由社区维护的,不是在自动的ci/cd版本中构建的。。。bitnami是类似的极简主义者,在多个云提供商中运行。为了 bitnami Kafka图片,请参阅他们的自述 debezium/kafka 这里提到了上面的文档。注意:不推荐使用播发的主机和端口设置。播发的听众涵盖了这两个方面 spotify/kafka 已弃用且过时。 fast-data-dev 对于一个多合一的解决方案是很好的,但它是臃肿的
对于补充阅读,一个功能齐全的 docker-compose ,以及网络图,请参见@rmoff的博客

回答

confluent quickstart(docker)文档假定所有生产和消费请求都在docker网络中。
您可以通过在kafka客户机自己的容器中运行kafka客户机代码来解决这个问题,但是如果不是这样,您需要添加更多的环境变量,以便在外部公开容器,同时使其在docker网络中工作。
首先添加的协议Map PLAINTEXT_HOST:PLAINTEXT 将侦听器协议Map到kafka协议
密钥: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP 价值: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT 然后在不同的端口上设置两个播发侦听器( kafka:9092 这里指的是docker容器名)。请注意,协议与上面Map的右侧值匹配
密钥: KAFKA_ADVERTISED_LISTENERS 价值: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 运行容器时,添加 -p 29092:29092 对于主机端口Map
热释光;dr(具有上述设置)
在docker网络之外运行任何kafka客户端(包括本地安装的cli工具)时,请使用 localhost:29092 对于引导服务器和 localhost:2181 Zookeeper
如果尝试从外部服务器进行连接,则需要播发主机的外部主机名/ip,而不是localhost
在docker网络中运行应用程序时,请使用 kafka:9092 对于引导服务器和 zookeeper:2181 对于zookeeper,就像其他docker服务通信一样
请参阅示例compose文件以了解完整的汇合堆栈

附录

对于任何对kubernetes部署感兴趣的人:https://operatorhub.io/?keyword=kafka

6kkfgxo0

6kkfgxo03#

在Zookeeper之前
docker容器运行--名称zookeeper-p 2181:2181 zookeeper
Kafka之后
docker容器运行--名称kafka-p 9092:9092-e kafka \u zookeeper \u connect=192.168.8.128:2181-e kafka \u播发的\u侦听器=plaintext://ip_address_of_your_computer_but_not_localhost!!!:9092-eKafka\u偏移量\u主题\u复制\u因子=1 confluentinc/cpKafka
在Kafka消费者和生产者配置

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

我用这些规则来运行我的项目。祝你好运伙计。

相关问题