来自kafka connect with json spooldir插件的kafka主题中的空消息

myss37ts  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(350)

我有以下设置-zookeeper、kafka、schema registry和kafka connect。它们都在Kubernetes的不同舱中运行。我想把一些json放在kafka connect和spooldir的一个文件夹中,将它们发送到特定的kafka主题。我想用它们作为我们应用程序的冷启动。这是Kafka连接的kubernetes配置

kind: StatefulSet
metadata:
  name: kafka-connect
  labels:
    app.kubernetes.io/name: kafka-connect
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: kafka-connect
  serviceName: kafka-connect-headless
  podManagementPolicy: OrderedReady
  updateStrategy:
    type: OnDelete
  replicas: 1
  template:
    metadata:
      labels:
        helm.sh/chart: kafka-0.21.0
        app.kubernetes.io/name: kafka-connect
    spec:
      terminationGracePeriodSeconds: 10
      volumes:
        - hostPath:
            path: /dependency/
            type: Directory
          name: dependency-data
      initContainers:
        - name: wait-for-schema-registry
          image: alpine:3.12.0
          command: ['sh', '-c', "until nslookup schema-registry-svc.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for schema registry; sleep 2; done"]
      containers:
        - name: kafka-connect
          image: "confluentinc/cp-kafka-connect:5.5.0"
          imagePullPolicy: "IfNotPresent"
          volumeMounts:
            - mountPath: /dependency/
              name: dependency-data
          ports:
            - containerPort: 8083
              name: kafka-connect
              protocol: TCP
          command:
            - sh
            - -exc
            - |
              mkdir -p /dependency/unprocessed && \
              mkdir -p /dependency/json/processed && mkdir -p /dependency/json/error && \
              confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.43 && \
              confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.0 && \
              exec /etc/confluent/docker/run && \ 
              sleep infinity
          env:
            - name: CONNECT_REST_ADVERTISED_HOST_NAME
              value: "kafka-connect"
            - name: KAFKA_HEAP_OPTS
              value: "-Xms2048M -Xmx2048M"
            - name: CONNECT_BOOTSTRAP_SERVERS
              value: "kafka:9092"
            - name: CONNECT_GROUP_ID
              value: "kafka-connect"
            - name: CONNECT_PLUGIN_PATH
              value: "/usr/share/java,/usr/share/confluent-hub-components/"
            - name: CONNECT_KEY_CONVERTER
              value: "org.apache.kafka.connect.storage.StringConverter"
            - name: CONNECT_VALUE_CONVERTER
              value: "io.confluent.connect.avro.AvroConverter"
            - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
              value: "http://schema-registry-svc:8081"
            - name: CONNECT_INTERNAL_KEY_CONVERTER
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_INTERNAL_VALUE_CONVERTER
              value: "org.apache.kafka.connect.json.JsonConverter"
            - name: CONNECT_CONFIG_STORAGE_TOPIC
              value: "kafka-connect-config"
            - name: CONNECT_OFFSET_STORAGE_TOPIC
              value: "kafka-connect-offset"
            - name: CONNECT_STATUS_STORAGE_TOPIC
              value: "kafka-connect-status"
            - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
              value: "1"
            - name: CONNECT_LOG4J_ROOT_LOGLEVEL
              value: "DEBUG"
            - name: CONNECT_CONNECT_PROTOCOL
              value: "compatible"
            - name: CONNECT_TOPIC_CREATION_ENABLE
              value: "true"

我发送的spooldir配置:

curl -i -X POST -H "Accept: application/json" -H "Content-Type: application/json" http://localhost:8083/connectors -d '{
  "name": "kafka-connect-spooldir",
  "config": {
    "connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
    "tasks.max":"1",
    "input.path":"/dependency/unprocessed/",
    "input.file.pattern":"^.*\\.json$",
    "error.path":"/dependency/json/error",
    "finished.path":"/dependency/json/processed",
    "auto.register.schemas": "false",
    "schema.generation.enabled": "true",
    "topic":"checklist",
    "value.converter.enhanced.avro.schema.support":"true",
    "topic.creation.enable": "true"
    }
}'

kafka connect日志中没有异常,它将文件从未处理的文件夹移动到已处理的文件夹,但是当我尝试使用kafka console consumer读取主题时,它说它读取了2条消息,但根本没有显示任何消息(只有2个空行和挂起)。然后我尝试在sr中注册checklist schema。我再次尝试处理这些文件-相同的reuslt,这次它说kafka控制台消费者消费了4条消息,但什么也没有显示。我尝试使用kafka avro控制台消费-这一次消费了0条消息。我甚至注意到Kafka连接注册了一个非常“假”的模式我的对象- {"subject":"checklist-value","version":5,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"com.github.jcustenborder.kafka.connect.model\",\"fields\":[{\"name\":\"Checklist\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"com.github.jcustenborder.kafka.connect.model.Value\"}"} 我也检查了Kafka和sr的日志,但没有注意到有什么不对劲的地方。也许不需要,但我也试着用 "schema.generation.enabled": "false" ,然后在日志中设置java.lang.nullpointerexception,如果“schema.generation.enabled”为false,则必须设置“value.schema”。然后尝试用kafka connect avro converter将我现有的.avsc转换为kafka模式,但结果发现他们没有正确的方法打印生成的模式,所以我猜这是目的,它与我的期望不一样。
我在kafka connect的配置中缺少了什么,可以用sr开始序列化并且不在主题中发送空消息?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题