KEDA配置如下
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: job-message-consumer
spec:
jobTargetRef:
parallelism: 1
template:
spec:
containers:
- name: {{ .Values.JobMessageConsumer.name }}
image: {{ .Values.image.repository }}
args:
- python
- manage.py
- jobs_consumer
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
- name: SSL_CA_LOCATION
value: {{ .Values.kafka.sslCaDirMount }}/ca.crt
- name: SSL_CA_DIR_MOUNT
value: {{ .Values.kafka.sslCaDirMount }}
- name: SSL_SECRET_NAME
value: {{ .Values.kafka.sslSecretName }}
- name: SERVICE_CONSUMER_GROUP_ID
value: {{ .Values.kafka.CONSUMER_GROUP_ID }}
- name: KAFKA_SERVICE_JOBS_TOPIC
value: {{ .Values.job_consumer_service.consumption_topic }}
{{- include "env" . | nindent 14 }}
resources:
limits:
cpu: "2"
memory: "4Gi"
requests:
cpu: "500m"
memory: "500Mi"
volumeMounts:
- name: user-certs
mountPath: {{ .Values.kafka.sslUserCertDir }}
- name: ca-certs
mountPath: {{ .Values.kafka.sslCaDirMount }}
volumes:
- name: user-certs
secret:
secretName: {{ .Release.Name }}-kafka-user-certs
- name: ca-certs
secret:
secretName: kafka-cluster-ca-cert
pollingInterval: 20
maxReplicaCount: 100
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 5
triggers:
- type: kafka
metadata:
bootstrapServers: "kafka-kafka-bootstrap.kafka:9093"
consumerGroup: "JobConsumer"
topic: {{ .Values.job_consumer_service.consumption_topic }}
lagThreshold: "1"
offsetResetPolicy: latest
authenticationRef:
name: {{ .Release.Name }}-keda-trigger-auth-kafka-credential
KEDA-作业需要运行很长时间才能完成任务,因此出现了ScaledJob。但是,使用上面的ScaledJobs配置,KEDA作业每隔20秒就会扩展一次。我希望KEDA作业仅基于Kafka触发器上的消息进行扩展,而不是基于轮询间隔。有人能在这件事上帮我吗
1条答案
按热度按时间iq0todco1#
请将此参数添加到
triggers
部分中的kafka
触发器:如果将
2.7.0
及更高版本中添加的此scaleToZeroOnInvalidOffset
参数设置为"true"
,则当分区没有有效偏移量[1]时,该分区的使用者将扩展到zero
。[1]-https://keda.sh/docs/2.7/scalers/apache-kafka/