spooldir源连接器未显示

fnx2tebb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(175)

我尝试使用restapi调用创建kafka connect spooldir源连接器。启动zookeeper和kafka服务器,并使用 kafka/bin/connect-distributed.sh dir-distributed.properties ,我从postman调用了以下api:

POST http://localhost:8083/connectors
{
    "name": "csv-source-orders",
    "config": {
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "tasks.max": "1",
        "topic": "orders",
        "input.file.pattern":"^orders.*.csv$",
        "input.path":"/Users/ivij/temp/source",
        "finished.path":"/Users/ivij/temp/finished",
        "error.path":"/Users/ivij/temp/error",
        "halt.on.error": "false",
        "csv.separator.char":"01",
        "value.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Value\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"order_id\":{\"type\":\"INT64\",\"isOptional\":false},\"customer_id\":{\"type\":\"INT64\",\"isOptional\":false},\"order_ts\":{\"type\":\"STRING\",\"isOptional\":false},\"product\":{\"type\":\"STRING\",\"isOptional\":false},\"order_total_usd\":{\"type\":\"STRING\",\"isOptional\":false}}}",
        "key.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Key\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"order_id\":{\"type\":\"INT64\",\"isOptional\":false}}}",
        "csv.first.row.as.header": "true",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }
}

响应状态为 201 Created 一条新资源已创建的消息。但在响应主体中,tasks字段为空:

"tasks": [],
    "type": "source"

当我尝试检查连接器的状态或使用列出连接器时 GET localhost:8083/connectors/ 收到的响应是 [] .
我还尝试列出在端口8083上创建的主题(以检查是否创建了api json中的“orders”主题),但这会导致outofmemory错误。
正在创建连接器吗?我如何解决这个问题?
编辑:以下是“dir distributed.properties”文件:


# connect-distributed.properties

bootstrap.servers=localhost:9092
group.id=connect-cluster-a

rest.port=8083

schema.generation.enabled=true
schema.generation.value.name=schemavalue
schema.generation.key.name=schemakey

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000
plugin.path=.../kafka-connect-spooldir/target/kafka-connect-target/usr/share/kafka-connect/

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题