使用独立模式,我创建了一个连接器和自定义转换,如下所示:
name=rabbitmq-source
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events
但是对于分布式连接器,连接restapi的put请求的语法是什么?我在文件里找不到任何例子。
已经尝试了以下几种方法:
cat <<EOF >/tmp/connector
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields": {
"type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"fields": "body,envelope.routingKey",
"structName": "net.gutefrage.events"
}
}
}
EOF
curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
rm /tmp/connector
或者这也不起作用:
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"transforms.extractFields.fields": "body,envelope.routingKey",
"transforms.extractFields.structName": "net.gutefrage.events"
}
}
对于最后一个变体,我得到以下错误:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
请注意,使用properties格式,它可以很好地工作(使用landoops在fast-data-dev中创建新的连接器ui。有趣的是,landoop的ui特性“translate to curl”生成与我的第二个示例相同的json)
更新
为了确保landoop、docker和我的自定义转换没有问题,我使用cop3.3.0中的标准分布式属性在分布式模式下启动了zookeeper、broker、schema registry和kafka connect bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
哪些日志 [2017-09-13 14:07:52,930] INFO Loading plugin from: /opt/connectors/confluent-oss-gf-assembly-1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176) [2017-09-13 14:07:53,711] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/connectors/confluent-oss-gf-assembly-1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199) [2017-09-13 14:07:53,711] INFO Added plugin 'com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
目前一切正常。然后我创建了一个连接器配置: cat <<EOF >/tmp/connector { "name": "rabbitmq-source", "config": { "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector", "tasks.max": "1", "rabbitmq.host": "rabbitmq-server", "rabbitmq.queue": "answers", "kafka.topic": "net.gutefrage.answers", "transforms": "extractFields", "transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.extractFields.field": "body" } } EOF
请注意,我现在使用的是标准(捆绑)提取字段转换。当我把它贴在 curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
我也一样 {"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint
/{connectorType}/config/validate"}*
4条答案
按热度按时间eaf3rand1#
如果要在独立模式下运行kafka connect worker,则必须启动worker并提供worker配置文件和一个或多个连接器配置文件。所有这些配置文件都是java属性格式,因此您提供的第一个配置示例是正确的格式:
如果要在分布式模式下运行kafka connect worker,则必须首先启动分布式worker,然后使用restapi和
PUT
使用json文档请求/connectors
终结点。该json文档将与您的第二个json文档的格式匹配:confluent cli包含在confluent的开源平台(包括kafka)中,它是一个开发人员工具,可以帮助您在分布式模式下运行zookeeper示例、kafka代理、confluent架构注册表、rest代理和连接工作程序,从而快速入门。加载连接器时,可以将连接器配置指定为json文件或属性文件,并使用将后者转换为json格式
jq
.但是,您报告的错误是:
此错误消息的重要部分是“从转换获取配置定义时出错:null”。虽然这有点太神秘了,但这意味着
config()
方法net.gutefrage.connector.transforms.ExtractFields
java类返回null。确保
net.gutefrage.connector.transforms.ExtractFields$Value
指定的字符串是嵌套静态类的正确完全限定名Value
,而且Value
类完全正确地实现org.apache.kafka.connect.transforms.Transformation<? extends ConnectRecord<R>>
接口。请注意config()
方法必须返回非nullConfigDef
对象。看一看ApacheKafka附带的单个消息转换(smt)的示例,或者robin的博客文章中的其他示例。
7fhtutme2#
要使用connector config的json格式和cp connect cli,必须在运行kafka connect集群的计算机上安装jq工具。
e、 对于landoops快速数据开发环境,您必须
这样就行了:
但这并不能解决使用连接器rest端点时的问题。
uurv41yg3#
确保bash命令cat不会将transforms.extractfields.type=net.gutefrage.connector.transforms.extractfields中的$value解释为变量。它对我有用。
2wnc66cl4#
与
fast-data-dev
您可以为任何连接器构建一个jar文件,然后按照下面的说明将其添加到类路径中https://github.com/landoop/fast-data-dev#enable-附加连接器
用户界面将自动检测新连接器-并在以下位置为新连接器单击“新建”时提供说明:
http://localhost:3030/Kafka连接ui
还有什么值得一试的呢
fast-data-dev
已经有了一个通用的mqtt接收器连接器,正在尝试。请参阅上的说明http://docs.datamountaineer.com/en/latest/mqtt-sink.html实际上你需要这样做
connect.mqtt.kcql=INSERT INTO /answers SELECT body FROM net.gutefrage.answers
由于这是一个通用的mqtt连接器,您可能需要使用enable-additional-connectors
说明