更新kubernetes中的kafka导致停机

qacovj5a  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(247)

我在库伯内茨经营一个4经纪人Kafka集群。复制因子为3,isr为2。
此外,还有一个生产者服务(运行springstream)生成消息和一个消费者服务从主题中读取消息。现在我尝试使用滚动更新来更新kafka集群,希望不会停机,但是在更新过程中,生产者的日志中充满了以下错误:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

根据我的计算,当1个经纪人倒下时,应该不会有问题,因为最小isr是2。然而,似乎生产者服务不知道滚动更新,并不断向同一个代理发送消息。。。
有什么办法解决吗?
这是我的Kafka

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
  namespace: default
  labels:
    app: kafka
spec:
  serviceName: kafka
  replicas: 4
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: kafka
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9308"
    spec:
      nodeSelector:
        middleware.node: "true"
      imagePullSecrets:
      - name: nexus-registry
      terminationGracePeriodSeconds: 300
      containers:
      - name: kafka
        image: kafka:2.12-2.1.0
        imagePullPolicy: IfNotPresent

        resources:
          limits:
            cpu: 3000m
            memory: 1800Mi
          requests:
            cpu: 2000m
            memory: 1800Mi
        env:

        # Replication
        - name: KAFKA_DEFAULT_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_MIN_INSYNC_REPLICAS
          value: "2"

        # Protocol Version
        - name: KAFKA_INTER_BROKER_PROTOCOL_VERSION
          value: "2.1"
        - name: KAFKA_LOG_MESSAGE_FORMAT_VERSION
          value: "2.1"

        - name: ENABLE_AUTO_EXTEND
          value: "true"
        - name: KAFKA_DELETE_TOPIC_ENABLE
          value: "true"
        - name: KAFKA_RESERVED_BROKER_MAX_ID
          value: "999999999"
        - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
          value: "true"
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR
          value: "10"
        - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
          value: "3"
        - name: KAFKA_LOG_RETENTION_BYTES
          value: "1800000000000"
        - name: KAFKA_ADVERTISED_HOST_NAME
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: KAFKA_OFFSETS_RETENTION_MINUTES
          value: "10080"
        - name: KAFKA_ZOOKEEPER_CONNECT
          valueFrom:
            configMapKeyRef:
              name: zk-config
              key: zk.endpoints
        - name: KAFKA_LOG_DIRS
          value: /kafka/kafka-logs
        ports:
        - name: kafka
          containerPort: 9092
        - name: prometheus
          containerPort: 7071
        volumeMounts:
        - name: data
          mountPath: /kafka
        readinessProbe:
          tcpSocket:
            port: 9092
          timeoutSeconds: 1
          failureThreshold: 12
          initialDelaySeconds: 10
          periodSeconds: 30
          successThreshold: 1
      - name: kafka-exporter
        image: danielqsj/kafka-exporter:latest
        resources:
          requests:
            cpu: 100m
            memory: 100Mi
          limits:
            cpu: 500m
            memory: 500Mi
        ports:
        - containerPort: 9308
  volumeClaimTemplates:
  - metadata:
      name: data
      labels:
        app: kafka
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 2000Gi

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题