Keda ScaledJOB Kafka-触发错误的缩放

z0qdvdin  于 2022-10-06  发布在  Kafka
关注(0)|答案(1)|浏览(221)

KEDA配置如下

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
  name: job-message-consumer
spec:
  jobTargetRef:
    parallelism: 1
    template:
      spec:
        containers:
          - name: {{ .Values.JobMessageConsumer.name }}
            image: {{ .Values.image.repository }}
            args:
              - python
              - manage.py
              - jobs_consumer
            imagePullPolicy: {{ .Values.image.pullPolicy }}
            env:
              - name: SSL_CA_LOCATION
                value: {{ .Values.kafka.sslCaDirMount }}/ca.crt
              - name: SSL_CA_DIR_MOUNT
                value: {{ .Values.kafka.sslCaDirMount }}
              - name: SSL_SECRET_NAME
                value: {{ .Values.kafka.sslSecretName }}
              - name: SERVICE_CONSUMER_GROUP_ID
                value: {{ .Values.kafka.CONSUMER_GROUP_ID }}
              - name: KAFKA_SERVICE_JOBS_TOPIC
                value: {{ .Values.job_consumer_service.consumption_topic }}
              {{- include "env" . | nindent 14 }}
            resources:
              limits:
                cpu: "2"
                memory: "4Gi"
              requests:
                cpu: "500m"
                memory: "500Mi"
            volumeMounts:
              - name: user-certs
                mountPath: {{ .Values.kafka.sslUserCertDir }}
              - name: ca-certs
                mountPath: {{ .Values.kafka.sslCaDirMount }}
        volumes:
          - name: user-certs
            secret:
              secretName: {{ .Release.Name }}-kafka-user-certs
          - name: ca-certs
            secret:
              secretName: kafka-cluster-ca-cert
  pollingInterval: 20
  maxReplicaCount: 100
  successfulJobsHistoryLimit: 5
  failedJobsHistoryLimit: 5
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: "kafka-kafka-bootstrap.kafka:9093"
        consumerGroup: "JobConsumer"
        topic: {{ .Values.job_consumer_service.consumption_topic }}
        lagThreshold: "1"
        offsetResetPolicy: latest
      authenticationRef:
        name: {{ .Release.Name }}-keda-trigger-auth-kafka-credential

KEDA-作业需要运行很长时间才能完成任务,因此出现了ScaledJob。但是,使用上面的ScaledJobs配置,KEDA作业每隔20秒就会扩展一次。我希望KEDA作业仅基于Kafka触发器上的消息进行扩展,而不是基于轮询间隔。有人能在这件事上帮我吗

iq0todco

iq0todco1#

请将此参数添加到triggers部分中的kafka触发器:

scaleToZeroOnInvalidOffset: "true"

如果将2.7.0及更高版本中添加的此scaleToZeroOnInvalidOffset参数设置为"true",则当分区没有有效偏移量[1]时,该分区的使用者将扩展到zero

[1]-https://keda.sh/docs/2.7/scalers/apache-kafka/

相关问题