使用自定义模式注册kafka数据连接器后,状态显示失败:“state”:“failed”。
这是登记表
$ curl -i -X PUT http://localhost:8083/connectors/datagen01/config -H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "topicx",
"schema.filename": "myschema.avro",
"schema.keyfield": "userid",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1"
}'
HTTP/1.1 200 OK
Date: Fri, 11 Dec 2020 15:51:37 GMT
Content-Type: application/json
Content-Length: 455
Server: Jetty(9.4.24.v20191120)
这是状态检查(“失败”)
curl -s http://localhost:8083/connectors/datagen01/status
{"name":"datagen01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.avro.SchemaParseException: Cannot parse <null> schema\n\tat org.apache.avro.Schema.parse(Schema.java:1595)\n\tat org.apache.avro.Schema$Parser.parse(Schema.java:1394)\n\tat org.apache.avro.Schema$Parser.parse(Schema.java:1365)\n\tat io.confluent.avro.random.generator.Generator$Builder.schemaStream(Generator.java:277)\n\tat io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:174)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"}],"type":"source"}
(注意:curl命令是从包含myschema.avro文件的同一文件夹执行的)
这里是进一步的背景资料,如果需要的话
dockerfile。。。
FROM confluentinc/cp-kafka-connect-base:6.0.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.4.0
docker-compose.yml公司
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:6.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
connect:
image: localimage/kafka-connect-datagen:latest
build:
context: .
dockerfile: Dockerfile
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
myschema.avro公司
{
"namespace": "ksql",
"name": "users",
"type": "record",
"fields": [
{"name": "registertime", "type": {
"type": "long",
"arg.properties": {
"range": {
"min": 1487715775521,
"max": 1519273364600
}
}
}},
{"name": "userid", "type": {
"type": "string",
"arg.properties": {
"regex": "User_[1-9]{0,1}"
}
}},
{"name": "regionid", "type": {
"type": "string",
"arg.properties": {
"regex": "Region_[1-9]?"
}
}},
{"name": "gender", "type": {
"type": "string",
"arg.properties": {
"options": [
"MALE",
"FEMALE",
"OTHER"
]
}
}}
]
}
运行docker编写。。。
docker-compose up -d --build
已注册的datagen连接器配置
$ curl -i -X PUT http://localhost:8083/connectors/datagen01/config -H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "topicx",
"schema.filename": "myschema.avro",
"schema.keyfield": "userid",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1"
}'
HTTP/1.1 200 OK
Date: Fri, 11 Dec 2020 15:51:37 GMT
Content-Type: application/json
Content-Length: 455
Server: Jetty(9.4.24.v20191120)
已检查状态
curl -s http://localhost:8083/connectors/datagen01/status
{"name":"datagen01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.avro.SchemaParseException: Cannot parse <null> schema\n\tat org.apache.avro.Schema.parse(Schema.java:1595)\n\tat org.apache.avro.Schema$Parser.parse(Schema.java:1394)\n\tat org.apache.avro.Schema$Parser.parse(Schema.java:1365)\n\tat io.confluent.avro.random.generator.Generator$Builder.schemaStream(Generator.java:277)\n\tat io.confluent.kafka.connect.datagen.DatagenTask.start(DatagenTask.java:174)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"}],"type":"source"}
1条答案
按热度按时间huus2vyu1#
它告诉你为什么失败。架构为空。
如果
schema.filename
如果不在容器中,则无论curl命令在何处运行,都无法读取架构您可以将其复制到容器中,也可以使用卷装载,并且还需要使用配置文件中的完整文件路径