将kafka ktable同步到sql数据库的最简单方法是什么?

7gs2gvoe  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(407)

我已经使用ksql创建了一个流和一个来自该流的聚合表。

{  
   "ksql":"DROP Stream IF EXISTS StreamLegacyNames; DROP Stream IF EXISTS StreamLegacy; CREATE Stream  StreamLegacy (payload  STRUCT<AgeYr  varchar>)WITH (KAFKA_TOPIC='eip-legacy-13',VALUE_FORMAT='JSON' );  CREATE Stream  StreamLegacyNames As Select payload->AgeYr Age from StreamLegacy; Create Table DimAge As SELECT Age FROM StreamLegacyNames Group By Age;",
   "streamsProperties":{  
      "ksql.streams.auto.offset.reset":"earliest"
   }
}

将此代码导出到sql表的最简单方法是什么?我们将jdbc连接器用于topic,但我不清楚这是否适用于聚合ksql表(在本例中是dimage)。
即使我在jdbc connect配置文件中将主题设置为dimage和以下内容。

value.converter.schemas.enable=false

完整的配置文件是

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=PASSWORD
auto.evolve=true
topics=DIMAGE
tasks.max=1
connection.user=USER
value.converter.schemas.enable=false
auto.create=true
connection.url=jdbc:sqlserver://SERVER

我在连接器中收到以下错误。

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

通过postman的ksql查询将ktable的格式显示为

{"row":{"columns":["83"]},"errorMessage":null,"finalMessage":null}
{"row":{"columns":["74"]},"errorMessage":null,"finalMessage":null}
{"row":{"columns":["36"]},"errorMessage":null,"finalMessage":null}
wmtdaxz3

wmtdaxz31#

当你 CREATE STREAM foo AS SELECT (“csas”)在ksql中,您正在创建一个新的kafka主题,并不断地用结果填充它 SELECT 声明。
所以你有一个Kafka主题,在你的例子中叫做 STREAMLEGACYNAMES (ksql通常将对象强制为大写)。您可以使用jdbcsink连接器将此主题流式传输到目标rdbms,包括mssql。

9wbgstp7

9wbgstp72#

ktable只是一天结束时的另一个主题。您可以使用ksql PRINT 或者 kafka-console-consumer 查看jdbc接收器连接器将获得什么数据。
如果您假设ksql表与sqlserver表完全匹配,那么它就不会匹配。在SQLServer表中,您将拥有发生在ktable上的每个“事件行”,包括空值,因为jdbc接收器尚不支持删除。
不确定您期望的是什么数据,但您可以做的是,对您试图捕获的事件执行窗口化输出,然后有效地将微批插入到您的downsteam数据库中。

相关问题