我想知道在汇合中心使用datagenconnector的过程,以便为测试主题生成随机数据。
我尝试用一些设置连接datagen连接器,但总是失败。
更新:我尝试连接在主题中创建的我自己的模式,但它没有将其连接起来,并且datagen连接器无法启动。
下面是datagen connect的配置:
{
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "false",
"name": "datagen-protobuf-userprofile",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"transforms": [
"SetSchemaMetadata"
],
"transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadata.schema.name": "value_User_PROFILE",
"kafka.topic": "User_PROFILE",
"max.interval": "1000",
"iterations": "10000000",
"schema.filename": "value_User_PROFILE",
"schema.keyfield": "userid",
"quickstart": "value_User_PROFILE"
}
新主题中定义的架构:
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "userid",
"type": "int"
},
{
"doc": "The string is a unicode character sequence.",
"name": "firstname",
"type": "string"
},
{
"doc": "The string is a unicode character sequence.",
"name": "lastname",
"type": "string"
},
{
"doc": "The string is a unicode character sequence.",
"name": "countrycode",
"type": "string"
},
{
"doc": "this si double which store floting value as well",
"name": "rating",
"type": "double"
}
],
"name": "value_User_PROFILE",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}
1条答案
按热度按时间5kgi1eie1#
您可以尝试以下步骤:
将datagen连接器目录放在配置的插件路径中——对于分布式worker,您需要为所有worker执行此操作
重新启动所有工作进程
连接器应出现在控制中心(hit)的连接器列表页上
connect-ip:8083/connector-plugins
确认连接器是否正确加载)填写连接器属性--您可以参考以下repo以获取示例配置值:https://github.com/confluentinc/kafka-connect-datagen/tree/master/config (您也可以上载控制中心上的属性文件)
测试并继续,稍后启动接头