我需要创建一个Kafka连接器与主题转换。根据本页https://docs.confluent.io/current/connect/transforms/extracttopic.html#extracttopic,我正在执行以下步骤:
confluent-hub install confluentinc/connect-transforms:latest
# Then plugin-path is automatically updated in /etc/schema-registry/connect-avro-distributed.properties
confluent start # This starts my confluent services including kafka-connect
我可以在连接日志中看到以下行:
cmd: confluent log connect
includes:
[2020-10-29 05:52:12,156] INFO Added plugin 'io.confluent.connect.transforms.ExtractTopic$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:140)
[2020-10-29 05:52:12,156] INFO Added plugin 'io.confluent.connect.transforms.ExtractTopic$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:140)
现在,我尝试使用以下配置创建连接器:
cat connect_config/connect-file-sink.properties
name=file-sink
connector.class=FileStreamSink
tasks.max=1
file=/root/sink.test
topics=meetups
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
transforms=KeyExample
transforms.KeyExample.type=io.confluent.connect.transforms.ExtractTopic$Key
我得到以下错误:
confluent load file-sink -d connect_config/connect-file-sink.properties
This CLI is intended for development only, not for production
https://docs.confluent.io/current/cli/index.html
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value io.confluent.connect.transforms.ExtractTopic$Key for configuration transforms.KeyExample.type: Class io.confluent.connect.transforms.ExtractTopic$Key could not be found.\nInvalid value null for configuration transforms.KeyExample.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
请让我知道如何解决这个错误。
暂无答案!
目前还没有任何答案,快来回答吧!