为什么简单的kafka datagen连接器会失败(i、 e.,“状态”:“失败”)

332nm8kg  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(319)

使用自定义模式注册kafka数据连接器后,状态显示失败:“state”:“failed”。
这是登记表

$ curl -i -X PUT http://localhost:8083/connectors/datagen01/config  -H "Content-Type: application/json"  -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "topicx",
    "schema.filename": "myschema.avro",
    "schema.keyfield": "userid",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "1"
}'
HTTP/1.1 200 OK
Date: Fri, 11 Dec 2020 15:51:37 GMT
Content-Type: application/json
Content-Length: 455
Server: Jetty(9.4.24.v20191120)

这是状态检查(“失败”)

curl -s http://localhost:8083/connectors/datagen01/status
{"name":"datagen01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.avro.SchemaParseException: Cannot parse <null> schema\n\tat org.apache.avro.Schema.parse(Schema.java:1595)\n\tat org.apache.avro.Schema$Parser.parse(Schema.java:1394)\n\tat org.apache.avro.Schema$Parser.parse(Schema.java:1365)\n\tat io.confluent.avro.random.generator.Generator$Builder.schemaStream(Generator.java:277)\n\tat io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:174)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"}],"type":"source"}

(注意:curl命令是从包含myschema.avro文件的同一文件夹执行的)
这里是进一步的背景资料,如果需要的话
dockerfile。。。

FROM confluentinc/cp-kafka-connect-base:6.0.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0

docker-compose.yml公司

version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:6.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'

  connect:
    image: localimage/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"

myschema.avro公司

{
    "namespace": "ksql",
    "name": "users",
    "type": "record",
    "fields": [
        {"name": "registertime", "type": {
            "type": "long",
            "arg.properties": {
                "range": {
                    "min": 1487715775521,
                    "max": 1519273364600
                }
            }
        }},
        {"name": "userid", "type": {
            "type": "string",
            "arg.properties": {
                "regex": "User_[1-9]{0,1}"
            }
        }},
        {"name": "regionid", "type": {
            "type": "string",
            "arg.properties": {
                "regex": "Region_[1-9]?"
            }
        }},
        {"name": "gender", "type": {
            "type": "string",
            "arg.properties": {
                "options": [
                    "MALE",
                    "FEMALE",
                    "OTHER"
                ]
            }
        }}
    ]
}

运行docker编写。。。

docker-compose up -d --build

已注册的datagen连接器配置

$ curl -i -X PUT http://localhost:8083/connectors/datagen01/config  -H "Content-Type: application/json"  -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "topicx",
    "schema.filename": "myschema.avro",
    "schema.keyfield": "userid",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "1"
}'
HTTP/1.1 200 OK
Date: Fri, 11 Dec 2020 15:51:37 GMT
Content-Type: application/json
Content-Length: 455
Server: Jetty(9.4.24.v20191120)

已检查状态

curl -s http://localhost:8083/connectors/datagen01/status
{"name":"datagen01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.avro.SchemaParseException: Cannot parse <null> schema\n\tat org.apache.avro.Schema.parse(Schema.java:1595)\n\tat org.apache.avro.Schema$Parser.parse(Schema.java:1394)\n\tat org.apache.avro.Schema$Parser.parse(Schema.java:1365)\n\tat io.confluent.avro.random.generator.Generator$Builder.schemaStream(Generator.java:277)\n\tat io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:174)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"}],"type":"source"}
huus2vyu

huus2vyu1#

它告诉你为什么失败。架构为空。
如果 schema.filename 如果不在容器中,则无论curl命令在何处运行,都无法读取架构
您可以将其复制到容器中,也可以使用卷装载,并且还需要使用配置文件中的完整文件路径

相关问题