如何创建json来创建带有转换的分布式kafka connect示例?

f3temu5u  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(319)

使用独立模式,我创建了一个连接器和自定义转换,如下所示:

name=rabbitmq-source
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events

但是对于分布式连接器,连接restapi的put请求的语法是什么?我在文件里找不到任何例子。
已经尝试了以下几种方法:

cat <<EOF >/tmp/connector
{
  "name": "rabbitmq-source",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max": "1",
    "rabbitmq.host": "rabbitmq-server",
    "rabbitmq.queue": "answers",
    "kafka.topic": "net.gutefrage.answers",
    "transforms": "extractFields",
    "transforms.extractFields": {
      "type": "net.gutefrage.connector.transforms.ExtractFields$Value",
      "fields": "body,envelope.routingKey",
      "structName": "net.gutefrage.events"
    }
  }
}
EOF

curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
rm /tmp/connector

或者这也不起作用:

{
  "name": "rabbitmq-source",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max": "1",
    "rabbitmq.host": "rabbitmq-server",
    "rabbitmq.queue": "answers",
    "kafka.topic": "net.gutefrage.answers",
    "transforms": "extractFields",
    "transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
    "transforms.extractFields.fields": "body,envelope.routingKey",
    "transforms.extractFields.structName": "net.gutefrage.events"
  }
}

对于最后一个变体,我得到以下错误:

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}

请注意,使用properties格式,它可以很好地工作(使用landoops在fast-data-dev中创建新的连接器ui。有趣的是,landoop的ui特性“translate to curl”生成与我的第二个示例相同的json)
更新
为了确保landoop、docker和我的自定义转换没有问题,我使用cop3.3.0中的标准分布式属性在分布式模式下启动了zookeeper、broker、schema registry和kafka connect bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties 哪些日志 [2017-09-13 14:07:52,930] INFO Loading plugin from: /opt/connectors/confluent-oss-gf-assembly-1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176) [2017-09-13 14:07:53,711] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/connectors/confluent-oss-gf-assembly-1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199) [2017-09-13 14:07:53,711] INFO Added plugin 'com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) 目前一切正常。然后我创建了一个连接器配置: cat <<EOF >/tmp/connector { "name": "rabbitmq-source", "config": { "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector", "tasks.max": "1", "rabbitmq.host": "rabbitmq-server", "rabbitmq.queue": "answers", "kafka.topic": "net.gutefrage.answers", "transforms": "extractFields", "transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.extractFields.field": "body" } } EOF 请注意,我现在使用的是标准(捆绑)提取字段转换。当我把它贴在 curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors" 我也一样 {"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint/{connectorType}/config/validate"}*

eaf3rand

eaf3rand1#

如果要在独立模式下运行kafka connect worker,则必须启动worker并提供worker配置文件和一个或多个连接器配置文件。所有这些配置文件都是java属性格式,因此您提供的第一个配置示例是正确的格式:

name=rabbitmq-source
connect.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events

如果要在分布式模式下运行kafka connect worker,则必须首先启动分布式worker,然后使用restapi和 PUT 使用json文档请求 /connectors 终结点。该json文档将与您的第二个json文档的格式匹配:

{
  "name": "rabbitmq-source",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max": "1",
    "rabbitmq.host": "rabbitmq-server",
    "rabbitmq.queue": "answers",
    "kafka.topic": "net.gutefrage.answers",
    "transforms": "extractFields",
    "transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
    "transforms.extractFields.fields": "body,envelope.routingKey",
    "transforms.extractFields.structName": "net.gutefrage.events"
  }
}

confluent cli包含在confluent的开源平台(包括kafka)中,它是一个开发人员工具,可以帮助您在分布式模式下运行zookeeper示例、kafka代理、confluent架构注册表、rest代理和连接工作程序,从而快速入门。加载连接器时,可以将连接器配置指定为json文件或属性文件,并使用将后者转换为json格式 jq .
但是,您报告的错误是:

{
  "error_code":400,
  "message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

此错误消息的重要部分是“从转换获取配置定义时出错:null”。虽然这有点太神秘了,但这意味着 config() 方法 net.gutefrage.connector.transforms.ExtractFields java类返回null。
确保 net.gutefrage.connector.transforms.ExtractFields$Value 指定的字符串是嵌套静态类的正确完全限定名 Value ,而且 Value 类完全正确地实现 org.apache.kafka.connect.transforms.Transformation<? extends ConnectRecord<R>> 接口。请注意 config() 方法必须返回非null ConfigDef 对象。
看一看ApacheKafka附带的单个消息转换(smt)的示例,或者robin的博客文章中的其他示例。

7fhtutme

7fhtutme2#

要使用connector config的json格式和cp connect cli,必须在运行kafka connect集群的计算机上安装jq工具。
e、 对于landoops快速数据开发环境,您必须

docker exec rabbitmqconnect_fast-data-dev_1 apk add --no-cache jq

这样就行了:

docker exec rabbitmqconnect_fast-data-dev_1 /opt/confluent-3.3.0/bin/confluent config rabbitmq-source -d /tmp/connector-config.json

但这并不能解决使用连接器rest端点时的问题。

uurv41yg

uurv41yg3#

确保bash命令cat不会将transforms.extractfields.type=net.gutefrage.connector.transforms.extractfields中的$value解释为变量。它对我有用。

2wnc66cl

2wnc66cl4#

fast-data-dev 您可以为任何连接器构建一个jar文件,然后按照下面的说明将其添加到类路径中
https://github.com/landoop/fast-data-dev#enable-附加连接器
用户界面将自动检测新连接器-并在以下位置为新连接器单击“新建”时提供说明:
http://localhost:3030/Kafka连接ui
还有什么值得一试的呢 fast-data-dev 已经有了一个通用的mqtt接收器连接器,正在尝试。请参阅上的说明http://docs.datamountaineer.com/en/latest/mqtt-sink.html
实际上你需要这样做 connect.mqtt.kcql=INSERT INTO /answers SELECT body FROM net.gutefrage.answers 由于这是一个通用的mqtt连接器,您可能需要使用 enable-additional-connectors 说明

相关问题