jdbc sink connector:如何将kafka消息中的字段Map到数据库表的列

amrnrhlw  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(423)

我正在使用confluent jdbc接收器连接器捕获从kafka主题到数据库的所有更改。我的消息是json格式,没有任何附加的模式。例如:

{ "key1": "value1", "key2": 100}

以下是我的配置:

name=sink-mysql-1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=send_1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
database.hostname=jdbc:mysql://0.0.0.0:3306/test_tbl
database.user=root
database.password=root
insert.mode=upsert
pk.mode=kafka
auto.create=true
auto.evolve=true

我遇到的问题是:由于遗留系统的原因,我无法更改消息格式。所以我的消息是没有模式信息的json对象。库是否支持Map字段?例如,在数据库下从字段aMap到字段b。
谢谢

cyej8jka

cyej8jka1#

要使用jdbc接收器,必须为数据声明一个模式。这意味着在实践中,您需要:
在avro中生成数据
以json格式生成具有预期模式/有效负载结构的数据
如果在将数据生成到kafka时没有该选项,则可以构建一个应用该模式的流处理阶段。您可以使用kafka streams或ksql之类的工具来实现这一点。它的输出是一个kafka主题,然后您将其用作kafka connect的源。在ksql中这样做的一个例子是:

-- Declare the schema of the source JSON topic
CREATE STREAM send_1_src (KEY1 VARCHAR, 
                          KEY2 INT) 
  WITH (KAFKA_TOPIC='send_1', 
        VALUE_FORMAT='JSON');

-- Run a continuous query populating the target topic `SEND_1_AVRO` 
-- with the data from `send_1` reserialised into Avro
CREATE STREAM SEND_1_AVRO 
  WITH (VALUE_FORMAT='AVRO') AS 
  SELECT * 
    FROM send_1_src;

要了解更多关于ksql的信息,请参阅这里。
您可以在这里的kafka教程中找到一些关于原始kafka消费者与kafka流与ksql流处理模式的好例子。

332nm8kg

332nm8kg2#

还有另一种选择,即在jdbc接收器连接器使用模式之前,编写使用者拦截器并将模式附加到值。
我试过了,成功了!

相关问题