从集群外部收听kubernetes上的kafak

new9mtju  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(496)

我在google云平台上运行一个kubernetes集群并安装了kafka(https://hub.kubeapps.com/charts/bitnami/kafka)在上面用舵图。我还有一个运行python pod的部署。我用负载均衡器暴露了kafka和zookeper。这是我跑步时得到的 kubectl get all ,(ip地址已更改)

kubectl get all

NAME                                    READY   STATUS    RESTARTS   AGE
pod/my-kafka-0                          1/1     Running   1          3h2m
pod/my-kafka-zookeepe-0                 1/1     Running   0          3h2m
pod/my-python-6c746645f5-5xvsb          1/1     Running   0          34m

NAME                                        TYPE           CLUSTER-IP    EXTERNAL-IP     PORT(S)                                        AGE
service/kubernetes                          ClusterIP      10.10.0.1     <none>          443/TCP                                        3h16m
service/my-kafka                            LoadBalancer   10.10.0.110   35.35.135.150   9092:30769/TCP                                 3h2m
service/my-kafka-headless                   ClusterIP      None          <none>          9092/TCP                                       3h2m
service/my-kafka-zookeepe                   LoadBalancer   10.10.0.45    35.35.135.160   2181:32740/TCP,2888:31095/TCP,3888:30057/TCP   3h2m
service/my-kafka-zookeepe-headless          ClusterIP      None          <none>          2181/TCP,2888/TCP,3888/TCP                     3h2m
service/my-python                           ClusterIP      10.10.10.80   <none>          9999/TCP                                       171m

NAME                              DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-python         1         1         1            1           136m

NAME                                         DESIRED   CURRENT   READY   AGE
replicaset.apps/my-python-6c746645f5         1         1         1       35m
replicaset.apps/my-python-848f769cd          0         0         0       136m

NAME                                        DESIRED   CURRENT   AGE
statefulset.apps/my-kafka                   1         1         3h2m
statefulset.apps/my-kafka-zookeepe          1         1         3h2m

如果我打开python pod的终端,我就可以访问kafka服务。我可以使用python创建主题,创建生产者和消费者,而且它可以毫无问题地工作。下面是我用来测试这个的代码片段。

kubectl exec -it my-python-6c746645f5-5xvsb /bin/bash
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers="my-kafka-headless.default.svc.cluster.local:9092", client_id='test')

topic_list = []
topic_list.append(NewTopic(name="test-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='my-kafka-headless.default.svc.cluster.local:9092')

producer.send('test-topic', b'message')
from kafka import KafkaConsumer

while True:

    consumer = KafkaConsumer('test-topic',
                            bootstrap_servers='my-kafka-headless.default.svc.cluster.local:9092')
    for msg in consumer:
        print (msg)

引导服务器的值是从kafka配置yaml文件中找到的。

- name: KAFKA_CFG_ADVERTISED_LISTENERS
  value: PLAINTEXT://$(MY_POD_NAME).my-kafka-headless.default.svc.cluster.local:$(KAFKA_PORT_NUMBER)

到目前为止,一切似乎都很顺利。现在,如果我尝试使用外部ip从外部访问kafka代理,它似乎不起作用。我可以使用以下内容查看主题。

c= kafka.KafkaConsumer(bootstrap_servers=["35.35.135.150:9092"])
c.topics()
set([test-topic'])

但是,我看不到使用以下命令的任何消息。

from kafka import KafkaConsumer

while True:

    consumer = KafkaConsumer('test-topic',
                            bootstrap_servers=["35.35.135.150:9092"])
    for msg in consumer:
        print (msg)

我也没有任何错误。我似乎不知道我做错了什么。

iqxoj9l9

iqxoj9l91#

多亏了cricket的评论,我才得以解决这个问题。我修改了配置文件,将外部端点包含为播发侦听器,并将内部服务名称移动到侦听器。

- name: KAFKA_CFG_LISTENERS
  value: PLAINTEXT://$(MY_POD_NAME).my-kafka-headless.default.svc.cluster.local:$(KAFKA_PORT_NUMBER)
- name: KAFKA_CFG_ADVERTISED_LISTENERS
  value: PLAINTEXT://35.35.135.150:9092

相关问题