Kubernetes上的Kafka和Python应用程序位于单独的pod中- NoBrokersAvailable()

fcipmucu  于 2023-02-07  发布在  Apache
关注(0)|答案(2)|浏览(98)

首先-我是Kubernetes的新手,我可能会省略一些基本原理。
我有一个可用的容器化应用程序,该应用程序使用Docker-Compose进行编排(运行正常),我正在重写它,以便部署到Kubernetes中。我已经通过Kompose将它转换为K8s.yaml文件,并对它进行了一定程度的修改。我很难在运行在不同pod上的Python应用程序和Kafka之间建立连接。Python应用程序经常返回NoBrokersAvailable()错误,无论我尝试应用什么-很明显,它无法连接到代理。我错过了什么?我已经定义了适当的侦听器和网络策略。我正在Minikube上本地运行它与本地Docker图像注册表。
Python应用程序连接到以下地址:KafkaProducer(bootstrap_servers='kafka-service.default.svc.cluster.local:9092')
kafka-deployment.yaml(Dockerfile映像基于confluentinc/cp-kafka:6.2.0,其中添加了主题设置脚本):

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: kafka
  name: kafka-app
spec:
  replicas: 1
  selector:
    matchLabels:
      io.kompose.service: kafka
  strategy: {}
  template:
    metadata:
      annotations:
        kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
        kompose.version: 1.27.0 (b0ed6a2c9)
      creationTimestamp: null
      labels:
        io.kompose.network/pipeline-network: "true"
        io.kompose.service: kafka
    spec:
      containers:
        - env:
            - name: KAFKA_LISTENERS
              value: "LISTENER_INTERNAL://0.0.0.0:29092,LISTENER_EXTERNAL://0.0.0.0:9092"
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "LISTENER_INTERNAL://localhost:29092,LISTENER_EXTERNAL://kafka-service.default.svc.cluster.local:9092"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "LISTENER_EXTERNAL:PLAINTEXT,LISTENER_INTERNAL:PLAINTEXT"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "LISTENER_INTERNAL"
            - name: KAFKA_BROKER_ID
              value: "1"
            - name: KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
              value: "0"
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zookeeper:2181
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
          image: finnhub-streaming-data-pipeline-kafka:latest
          imagePullPolicy: Never
          lifecycle:
            postStart:
              exec: 
                command: ["/bin/sh","-c","/kafka-setup-k8s.sh"]
          name: kafka-app
          ports:
            - containerPort: 9092
            - containerPort: 29092
          resources: {}
      restartPolicy: Always

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
spec:
  selector:
    app: kafka
  ports:
    - protocol: TCP
      name: firstport
      port: 9092
      targetPort: 9092
    - protocol: TCP
      name: secondport
      port: 29092
      targetPort: 29092

finnhub-producer.yaml(又名Python应用程序部署):

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: finnhubproducer
  name: finnhubproducer
spec:
  replicas: 1
  selector:
    matchLabels:
      io.kompose.service: finnhubproducer
  strategy: {}
  template:
    metadata:
      annotations:
        kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
        kompose.version: 1.27.0 (b0ed6a2c9)
      creationTimestamp: null
      labels:
        io.kompose.network/pipeline-network: "true"
        io.kompose.service: finnhubproducer
    spec:
      containers:
        - env:
            - name: KAFKA_PORT
              value: "9092"
            - name: KAFKA_SERVER
              value: kafka-service.default.svc.cluster.local
            - name: KAFKA_TOPIC_NAME
              value: market
          image: docker.io/library/finnhub-streaming-data-pipeline-finnhubproducer:latest
          imagePullPolicy: Never
          name: finnhubproducer
          ports:
            - containerPort: 8001
          resources: {}
      restartPolicy: Always
status: {}

---
apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: C:\ProgramData\chocolatey\lib\kubernetes-kompose\tools\kompose.exe convert
    kompose.version: 1.27.0 (b0ed6a2c9)
  creationTimestamp: null
  labels:
    io.kompose.service: finnhubproducer
  name: finnhubproducer
spec:
  ports:
    - name: "8001"
      port: 8001
      targetPort: 8001
  selector:
    io.kompose.service: finnhubproducer
status:
  loadBalancer: {}

pipeline-network-networkpolicy.yaml:

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  creationTimestamp: null
  name: pipeline-network
spec:
  ingress:
    - from:
        - podSelector:
            matchLabels:
              io.kompose.network/pipeline-network: "true"
  podSelector:
    matchLabels:
      io.kompose.network/pipeline-network: "true"

编辑:Kafka图像的停靠文件:

FROM confluentinc/cp-kafka:6.2.0

COPY ./scripts/kafka-setup-k8s.sh /kafka-setup-k8s.sh

kafka-setup-k8s.sh:

# blocks until kafka is reachable
kafka-topics --bootstrap-server localhost:29092 --list

echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server localhost:29092 --create --if-not-exists --topic market --replication-factor 1 --partitions 1

echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server localhost:29092 --list
xcitsw88

xcitsw881#

您的服务的应用选择器为kafka,而部署为kafka-app,因此它们未连接。
我建议你使用Strimzi(或者Confluent for Kubernetes,如果你想使用他们的图片),不要使用Kompose转换你现有的Docker Compose文件,因为它很少能得到正确的网络策略。事实上,你可以完全删除网络标签和网络策略,因为在同一个命名空间中并不真的需要它。
对于Python应用程序,不需要单独定义Kafka主机和端口;KAFKA_BOOTSTRAP_SERVERS使用一个变量,它可以接受多个代理,包括它们的端口

h79rfbju

h79rfbju2#

我已经设法通过从部署中删除服务并运行kubectl expose deployment kafka-app来使其工作。问题来自组合标签。

相关问题