在kubernetes集群外访问bitnami/kafka

dphi5xsq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(901)

我目前正在使用bitnami/kafka图像(https://hub.docker.com/r/bitnami/kafka)在Kubernetes部署。
Kubernetes大师:1
kubernetes工人:3人
在集群中,其他应用程序可以找到kafka。当尝试从集群外部访问kafka容器时,会出现问题。当我读到一点时,我读到我们需要设置属性“adverted.listener=p”lainttext://hostname:端口号“的外部Kafka客户端。
我正在引用“https://github.com/bitnami/charts/tree/master/bitnami/kafka". 在my values.yaml文件中我添加了
值.yaml
播音员1:10.21.0.191
和statefulset.yaml

- name: KAFKA_CFG_ADVERTISED_LISTENERS
      value: 'PLAINTEXT://{{ .Values.advertisedListeners }}:9092'

就一个Kafka的例子来说,它工作得很好。
但是对于3节点kafka集群,我更改了如下一些配置:values.yaml
播音员1:10.21.0.191
播音员2:10.21.0.192
播音员3:10.21.0.193
和statefulset.yaml

- name: KAFKA_CFG_ADVERTISED_LISTENERS
      {{- if $MY_POD_NAME := "kafka-0" }}
      value: 'PLAINTEXT://{{ .Values.advertisedListeners1 }}:9092'
      {{- else if $MY_POD_NAME := "kafka-1" }}
      value: 'PLAINTEXT://{{ .Values.advertisedListeners2 }}:9092'
      {{- else if $MY_POD_NAME := "kafka-2" }}
      value: 'PLAINTEXT://{{ .Values.advertisedListeners3 }}:9092'
      {{- end }}

预期结果是,所有3个kafka示例都应将advised.listener属性设置为worker nodes ip address。
例子:
Kafka-0-->“plaintext://10.21.0.191:9092"
Kafka-1-->“plaintext://10.21.0.192:9092"
Kafka-3-->“plaintext://10.21.0.193:9092"
目前只有一个Kafka吊舱在启动和运行,其他两个将崩溃回退状态。
另外两个吊舱显示错误为:
[2019-10-20 13:09:37753]信息[logdirfailurehandler]:正在启动(kafka.server.replicamanager$logdirfailurehandler)[2019-10-20 13:09:37786]错误[kafkaserver id=1002]kafkaserver启动期间出现致命错误。准备关闭(kafka.server.kafkaserver)java.lang.illegalargumentexception:请求失败:播发侦听器中配置的端点10.21.0.191:9092已由代理1001在scala.predef$.require(predef)注册。scala:224)在kafka.server.kafkaserver$$anonfun$createbrokerinfo$2.apply(kafkaserver。scala:399)在kafka.server.kafkaserver$$anonfun$createbrokerinfo$2.apply(kafkaserver。scala:397)在scala.collection.mutable.resizablearray$class.foreach(resizablearray。scala:59)在scala.collection.mutable.arraybuffer.foreach(arraybuffer。scala:48)在kafka.server.kafkaserver.createbrokerinfo(kafkaserver。scala:397)在kafka.server.kafkaserver.startup(kafkaserver。scala:261)在kafka.server.kafkaserverstartable.startup(kafkaserverstartable。scala:38)在Kafka。Kafka$.main(Kafka。scala:84)在Kafka。Kafka。梅因(Kafka。斯卡拉)
这意味着statefulset.yaml中应用的逻辑不起作用。有人能帮我解决这个问题吗。。?
任何帮助都将不胜感激。。
输出 kubectl get statefulset kafka -o yaml ```
apiVersion: apps/v1
kind: StatefulSet
metadata:
creationTimestamp: "2019-10-29T07:04:12Z"
generation: 1
labels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/managed-by: Tiller
app.kubernetes.io/name: kafka
helm.sh/chart: kafka-6.0.1
name: kafka
namespace: default
resourceVersion: "12189730"
selfLink: /apis/apps/v1/namespaces/default/statefulsets/kafka
uid: d40cfd5f-46a6-49d0-a9d3-e3a851356063
spec:
podManagementPolicy: Parallel
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
serviceName: kafka-headless
template:
metadata:
creationTimestamp: null
labels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/managed-by: Tiller
app.kubernetes.io/name: kafka
helm.sh/chart: kafka-6.0.1
name: kafka
spec:
containers:
- env:
- name: MY_POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: MY_POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: kafka-zookeeper
- name: KAFKA_PORT_NUMBER
value: "9092"
- name: KAFKA_CFG_LISTENERS
value: PLAINTEXT://:$(KAFKA_PORT_NUMBER)
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://10.21.0.191:9092
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_CFG_BROKER_ID
value: "-1"
- name: KAFKA_CFG_DELETE_TOPIC_ENABLE
value: "false"
- name: KAFKA_HEAP_OPTS
value: -Xmx1024m -Xms1024m
- name: KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES
value: "10000"
- name: KAFKA_CFG_LOG_FLUSH_INTERVAL_MS
value: "1000"
- name: KAFKA_CFG_LOG_RETENTION_BYTES
value: "1073741824"
- name: KAFKA_CFG_LOG_RETENTION_CHECK_INTERVALS_MS
value: "300000"
- name: KAFKA_CFG_LOG_RETENTION_HOURS
value: "168"
- name: KAFKA_CFG_LOG_MESSAGE_FORMAT_VERSION
- name: KAFKA_CFG_MESSAGE_MAX_BYTES
value: "1000012"
- name: KAFKA_CFG_LOG_SEGMENT_BYTES
value: "1073741824"
- name: KAFKA_CFG_LOG_DIRS
value: /bitnami/kafka/data
- name: KAFKA_CFG_DEFAULT_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
value: https
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_CFG_NUM_IO_THREADS
value: "8"
- name: KAFKA_CFG_NUM_NETWORK_THREADS
value: "3"
- name: KAFKA_CFG_NUM_PARTITIONS
value: "1"
- name: KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR
value: "1"
- name: KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES
value: "102400"
- name: KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES
value: "104857600"
- name: KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES
value: "102400"
- name: KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS
value: "6000"
image: docker.io/bitnami/kafka:2.3.0-debian-9-r88
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 2
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: kafka
timeoutSeconds: 5
name: kafka
ports:
- containerPort: 9092
name: kafka
protocol: TCP
readinessProbe:
failureThreshold: 6
initialDelaySeconds: 5
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: kafka
timeoutSeconds: 5
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /bitnami/kafka
name: data
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
fsGroup: 1001
runAsUser: 1001
terminationGracePeriodSeconds: 30
updateStrategy:
type: RollingUpdate
volumeClaimTemplates:

  • metadata:
    creationTimestamp: null
    name: data
    spec:
    accessModes:
    • ReadWriteOnce
      resources:
      requests:
      storage: 8Gi
      volumeMode: Filesystem
      status:
      phase: Pending
      status:
      collisionCount: 0
      currentReplicas: 3
      currentRevision: kafka-56ff499d74
      observedGeneration: 1
      readyReplicas: 1
      replicas: 3
      updateRevision: kafka-56ff499d74
      updatedReplicas: 3
fsi0uk1n

fsi0uk1n1#

我看到您在为statefulset中的不同pod传递不同的环境变量时遇到了一些问题。
您正在尝试使用 Helm 模板来实现这一点:

- name: KAFKA_CFG_ADVERTISED_LISTENERS
  {{- if $MY_POD_NAME := "kafka-0" }}
  value: 'PLAINTEXT://{{ .Values.advertisedListeners1 }}:9092'
  {{- else if $MY_POD_NAME := "kafka-1" }}
  value: 'PLAINTEXT://{{ .Values.advertisedListeners2 }}:9092'
  {{- else if $MY_POD_NAME := "kafka-2" }}
  value: 'PLAINTEXT://{{ .Values.advertisedListeners3 }}:9092'
  {{- end }}

在helm模板指南文档中,您可以找到以下解释:
在helm模板中,变量是对另一个对象的命名引用。它遵循$name的形式。变量用一个特殊的赋值运算符::=。
现在让我们看看您的代码:

{{- if $MY_POD_NAME := "kafka-0" }}

这是变量赋值,不是比较,赋值之后, if 语句将此表达式计算为 true 这就是为什么在你的舞台上 yaml 您可以将其视为输出:

- name: KAFKA_CFG_ADVERTISED_LISTENERS
    value: PLAINTEXT://10.21.0.191:9092

为了让它像预期的那样工作,你不应该使用 Helm 模板。这是行不通的。
一种方法是为每个kafka节点创建单独的环境变量,并将所有这些变量传递给所有pod,如下所示:

- env:
  - name: MY_POD_NAME
    valueFrom:
      fieldRef:
        apiVersion: v1
        fieldPath: metadata.name
  - name: KAFKA_0
      value: 10.21.0.191
  - name: KAFKA_1
      value: 10.21.0.192
  - name: KAFKA_2
      value: 10.21.0.193

# - name: KAFKA_CFG_ADVERTISED_LISTENERS

# value: PLAINTEXT://$MY_POD_NAME:9092

也可以创建自己的docker图像和修改后的开始脚本,将导出 KAFKA_CFG_ADVERTISED_LISTENERS 具有适当值的变量取决于 MY_POD_NAME .
如果你不想创建自己的图像,你可以创建一个 ConfigMap 已修改 entrypoint.sh 把它装在旧的地方 entrypoint.sh (您也可以使用任何其他文件,只需在此处查看有关如何构建Kafka图像的更多信息)。
安装 ConfigMap 看起来像这样:

apiVersion: v1
kind: Pod
metadata:
  name: test
spec:
  containers:
    - name: test-container
      image: docker.io/bitnami/kafka:2.3.0-debian-9-r88
      volumeMounts:
      - name: config-volume
        mountPath: /entrypoint.sh
        subPath: entrypoint.sh
  volumes:
    - name: config-volume
      configMap:
        # Provide the name of the ConfigMap containing the files you want
        # to add to the container
        name: kafka-entrypoint-config
        defaultMode: 0744 # remember to add proper (executable) permissions

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-entrypoint-config
  namespace: default
data:
  entrypoint.sh: |
    #!/bin/bash
    # Here add modified entrypoint script

如果有用请告诉我。

相关问题