我必须将aurora/mysql的记录发送到msk,然后再从那里发送到ElasticSearch服务
极光-->Kafka连接--->aws msk--->Kafka连接--->ElasticSearch
aurora表结构中的记录是这样的
我想这张唱片会以这种格式传到aws msk。
"o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?xml version=""1.0"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-5d17-138e-9749-setwr23424","","","",,"","",""
所以为了使用ElasticSearch,我需要使用适当的模式,所以我必须使用模式注册表。
我的问题
问题1
对于上述类型的消息,我应该如何使用schema registry schema registry is required?。我必须为此创建json结构吗?如果是,我将它保存在哪里。需要更多的帮助才能理解这一点吗?
我编辑过
vim /usr/local/confluent/etc/schema-registry/schema-registry.properties
提到了zookeper,但我不知道是什么 kafkastore.topic=_schema
如何将此链接到自定义架构。
就连我也开始犯这个错误
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas not present in metadata after 60000 ms.
这是我所期待的,因为我没有做任何关于模式的事情。
我确实安装了jdbc连接器,当我开始的时候,我得到下面的错误
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
问题2我能否在一个ec2上创建两个连接器(jdbc和elastic serach one)。如果是,我必须在单独的cli中同时启动这两个连接器吗?
问题3当我打开vim/usr/local/confluent/etc/kafka connect jdbc/source-quickstart-sqlite.properties时,我只看到如下properties值
name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
mode=incrementing
incrementing.column.name=id
topic.prefix=trf-aurora-fspaudit-
在上面的属性文件中,我可以提到模式名和表名吗?
根据答案,我正在更新kafka connect jdbc的配置
4条答案
按热度按时间vlurs2pr1#
-----启动jdbc连接ElasticSearch
bsxbgnwa2#
然后
然后我修改了下面的属性
上次我修改
我修改了下面的属性
当我列出主题时,我看不到为表名列出的任何主题。
错误消息的堆栈跟踪
cvxl0en23#
是否需要架构注册表?
不可以。您可以在json记录中启用架构。jdbc源代码可以根据表信息为您创建它们
提到了zookeper,但我没有提到什么是kafkastore.topic=\u schema
如果要使用schema registry,应该使用
kafkastore.bootstrap.servers
是Kafka的地址,不是Zookeeper。所以移除kafkastore.connection.url
请阅读所有属性的说明文件我没有做任何关于模式的事情。
没关系。schemas主题在注册表首次启动时创建
我可以在一个ec2上创建两个连接器吗
是(忽略可用的jvm堆空间)。同样,Kafka连接文档中对此进行了详细说明。
使用独立模式,首先要传递connect worker配置,然后在一个命令中最多传递n个连接器属性
使用分布式模式,可以使用kafka connect restapi
https://docs.confluent.io/current/connect/managing/configuring.html
当我打开vim/usr/local/confluent/etc/kafka connect jdbc/source-quickstart-sqlite.properties时
首先,这是针对sqlite的,而不是mysql/postgres。您不需要使用快速启动文件,它们只是作为参考
同样,所有属性都有很好的文档记录
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc公司
我确实安装了jdbc连接器,当我开始的时候,我得到下面的错误
下面是关于如何调试的更多信息
https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/
如前所述,我个人建议尽可能使用debezium/cdc
用于rds aurora的debezium连接器
ldioqlga4#
我猜您计划使用avro来传输数据,所以在启动kafka connect workers时不要忘记指定avroconverter作为默认转换器。如果您将使用json,那么就不需要模式注册表。
1.1
kafkastore.topic=_schema
您是否已启动自己的模式注册表?启动schema registry时,必须指定“schemas”主题。基本上,schema registry将使用这个主题来存储它注册的模式,如果出现故障,它可以从那里恢复它们。1.2
jdbc connector installed and when i start i get below error
默认情况下,jdbc连接器仅适用于sqlite和postgresql。如果您希望它与mysql数据库一起工作,那么您也应该将mysql驱动程序添加到类路径中。2.这取决于您如何部署kafka connect工作人员。如果您选择分布式模式(推荐),那么实际上不需要单独的cli。您可以通过kafka connect restapi部署连接器。
3.有另一个属性称为
table.whitelist
可以在其上指定模式和表。e、 g:表.白名单用户、产品、事务