在kubernetes上运行简单的kafka示例

guicsvcw  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(457)

我是kubernetes的新手,尝试在单节点minikube集群上部署单个副本kafka示例。
这是zookeeper服务/部署yml

apiVersion: v1
kind: Service
metadata:
  name: zookeeper-cluster
  labels:
    component: zookeeper
spec:
  ports:
  - name: "2181"
    port: 2181
    targetPort: 2181
  selector:
    component: zookeeper
status:
  loadBalancer: {}

---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: zookeeper
  name: zookeeper
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      component: zookeeper      
  template:
    metadata:
      labels:
        component: zookeeper
    spec:
      containers:
      - image: zookeeper:3.4.13
        name: zookeeper
        ports:
        - containerPort: 2181
        resources:
          limits:
            memory: "256Mi"
            cpu: "100m"
        volumeMounts:
        - mountPath: /conf
          name: zookeeper-claim0
        - mountPath: /data
          name: zookeeper-claim1
        - mountPath: /datalog
          name: zookeeper-claim2
      restartPolicy: Always
      volumes:
      - name: zookeeper-claim0
        persistentVolumeClaim:
          claimName: zookeeper-claim0
      - name: zookeeper-claim1
        persistentVolumeClaim:
          claimName: zookeeper-claim1
      - name: zookeeper-claim2
        persistentVolumeClaim:
          claimName: zookeeper-claim2
status: {}

这是kafka服务/部署yml

apiVersion: v1
kind: Service
metadata:
  name: kafka-cluster
  labels:
    component: kafka
spec:
  ports:
  - name: "9092"
    port: 9092
    targetPort: 9092
  selector:
    component: kafka
status:
  loadBalancer: {}

---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      component: kafka    
  template:
    metadata:
      labels:
        component: kafka
    spec:
      containers:
      - args:
        - start-kafka.sh
        env:
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: LISTENER_BOB
        - name: KAFKA_ADVERTISED_LISTENERS
          value: LISTENER_BOB://:9092
        - name: KAFKA_LISTENERS
          value: LISTENER_BOB://:9092
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value:  LISTENER_BOB:PLAINTEXT
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_LOG_DIRS
          value: /kafka/kafka-logs
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-cluster:2181
        image: wurstmeister/kafka:2.12-2.4.1
        name: kafka
        ports:
        - containerPort: 9092
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
        volumeMounts:
        - mountPath: /kafka/kafka-logs
          name: kafka-claim0
      restartPolicy: Always
      volumes:
      - name: kafka-claim0
        persistentVolumeClaim:
          claimName: kafka-claim0
status: {}

当试图从Kafka上的另一个应用程序访问Kafka时-cluster:9092,它也作为部署运行,它抛出unsolvedAddressException。其中kafka-6799c65d58-f6tbt:9092 is pod名称

java.io.IOException: Can't resolve address:**kafka-6799c65d58-f6tbt:9092**
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
    at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:64)
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1035)
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:920)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:508)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: java.nio.channels.UnresolvedAddressException: null
    at java.base/sun.nio.ch.Net.checkAddress(Net.java:130)
    at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:675)
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
    ... 9 common frames omitted

我在配置它时是否犯了错误?或者有别的选择吗?

nxowjjhe

nxowjjhe1#

我能想到几个问题。
服务类型尚未显式设置,因此,默认为 ClusterIP 只能在群集中访问的类型(k8s内部dns应工作)。你可以用 NodePort 或者 LoadBalancer 如果你想把它暴露在外面的世界。
即使您的应用程序运行在同一个k8s集群和不同的k8s命名空间中,也必须使用 kafka-cluster.mynamespace 作为内部地址。
查看此处了解更多信息:https://kubernetes.io/docs/concepts/services-networking/

f45qwnt8

f45qwnt82#

看起来kafka代理正在将自己的主机名(kafka-6799c65d58-f6tbt)作为fqdn进行广告,fqdn与pod名称相同。dns无法解析部署吊舱的名称。
如果你看一看Kafka的 Helm 图,也就是这张图,你会发现它们使用的是状态集。statefulset允许解析pod的ip地址。来看看k8s文档中的工作原理。
您也可以尝试将kafka\u侦听器设置为:

- name: MY_POD_IP
  valueFrom:
    fieldRef:
      fieldPath: status.podIP
- name: KAFKA_ADVERTISED_LISTENERS
  value: "LISTENER_BOB://$(MY_POD_IP):9092/"

但在更改副本数量时,它的扩展性不好。

相关问题