我尝试使用restapi调用创建kafka connect spooldir源连接器。启动zookeeper和kafka服务器,并使用 kafka/bin/connect-distributed.sh dir-distributed.properties
,我从postman调用了以下api:
POST http://localhost:8083/connectors
{
"name": "csv-source-orders",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"tasks.max": "1",
"topic": "orders",
"input.file.pattern":"^orders.*.csv$",
"input.path":"/Users/ivij/temp/source",
"finished.path":"/Users/ivij/temp/finished",
"error.path":"/Users/ivij/temp/error",
"halt.on.error": "false",
"csv.separator.char":"01",
"value.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Value\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"order_id\":{\"type\":\"INT64\",\"isOptional\":false},\"customer_id\":{\"type\":\"INT64\",\"isOptional\":false},\"order_ts\":{\"type\":\"STRING\",\"isOptional\":false},\"product\":{\"type\":\"STRING\",\"isOptional\":false},\"order_total_usd\":{\"type\":\"STRING\",\"isOptional\":false}}}",
"key.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Key\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"order_id\":{\"type\":\"INT64\",\"isOptional\":false}}}",
"csv.first.row.as.header": "true",
"flush.size": "100",
"rotate.interval.ms": "1000"
}
}
响应状态为 201 Created
一条新资源已创建的消息。但在响应主体中,tasks字段为空:
"tasks": [],
"type": "source"
当我尝试检查连接器的状态或使用列出连接器时 GET localhost:8083/connectors/
收到的响应是 []
.
我还尝试列出在端口8083上创建的主题(以检查是否创建了api json中的“orders”主题),但这会导致outofmemory错误。
正在创建连接器吗?我如何解决这个问题?
编辑:以下是“dir distributed.properties”文件:
# connect-distributed.properties
bootstrap.servers=localhost:9092
group.id=connect-cluster-a
rest.port=8083
schema.generation.enabled=true
schema.generation.value.name=schemavalue
schema.generation.key.name=schemakey
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=.../kafka-connect-spooldir/target/kafka-connect-target/usr/share/kafka-connect/
暂无答案!
目前还没有任何答案,快来回答吧!