我已经使用ksql创建了一个流和一个来自该流的聚合表。
{
"ksql":"DROP Stream IF EXISTS StreamLegacyNames; DROP Stream IF EXISTS StreamLegacy; CREATE Stream StreamLegacy (payload STRUCT<AgeYr varchar>)WITH (KAFKA_TOPIC='eip-legacy-13',VALUE_FORMAT='JSON' ); CREATE Stream StreamLegacyNames As Select payload->AgeYr Age from StreamLegacy; Create Table DimAge As SELECT Age FROM StreamLegacyNames Group By Age;",
"streamsProperties":{
"ksql.streams.auto.offset.reset":"earliest"
}
}
将此代码导出到sql表的最简单方法是什么?我们将jdbc连接器用于topic,但我不清楚这是否适用于聚合ksql表(在本例中是dimage)。
即使我在jdbc connect配置文件中将主题设置为dimage和以下内容。
value.converter.schemas.enable=false
完整的配置文件是
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=PASSWORD
auto.evolve=true
topics=DIMAGE
tasks.max=1
connection.user=USER
value.converter.schemas.enable=false
auto.create=true
connection.url=jdbc:sqlserver://SERVER
我在连接器中收到以下错误。
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
通过postman的ksql查询将ktable的格式显示为
{"row":{"columns":["83"]},"errorMessage":null,"finalMessage":null}
{"row":{"columns":["74"]},"errorMessage":null,"finalMessage":null}
{"row":{"columns":["36"]},"errorMessage":null,"finalMessage":null}
2条答案
按热度按时间wmtdaxz31#
当你
CREATE STREAM foo AS SELECT
(“csas”)在ksql中,您正在创建一个新的kafka主题,并不断地用结果填充它SELECT
声明。所以你有一个Kafka主题,在你的例子中叫做
STREAMLEGACYNAMES
(ksql通常将对象强制为大写)。您可以使用jdbcsink连接器将此主题流式传输到目标rdbms,包括mssql。9wbgstp72#
ktable只是一天结束时的另一个主题。您可以使用ksql
PRINT
或者kafka-console-consumer
查看jdbc接收器连接器将获得什么数据。如果您假设ksql表与sqlserver表完全匹配,那么它就不会匹配。在SQLServer表中,您将拥有发生在ktable上的每个“事件行”,包括空值,因为jdbc接收器尚不支持删除。
不确定您期望的是什么数据,但您可以做的是,对您试图捕获的事件执行窗口化输出,然后有效地将微批插入到您的downsteam数据库中。