我用FLINK SQL创建了一个记录。
CREATE TABLE en_trans (
`transid` INTEGER,
`productname` INTEGER,
PRIMARY KEY (transid) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'en_trans',
'properties.bootstrap.servers' = '....:9092',
'properties.group.id' = 'en_trans_group_test',
'key.format' = 'avro-confluent',
'value.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://kafka-netlex-cp-schema-registry:8081',
'value.avro-confluent.url' = 'http://kafka-netlex-cp-schema-registry:8081'
);
个字符
我出错了
org.apache.flink.table.api.ValidationException: Querying an unbounded table 'default_catalog.default_database.en_trans' in batch mode is not allowed. The table source is unbounded.
型
我不想要流模式,因为我需要提供这个作为休息服务,分页的目的。而不是STREAMING
模式。
2条答案
按热度按时间jtoj6r0c1#
切换到流媒体模式,您需要创建一个
StreamTableEnvironment
,而不是BatchTableEnvironment
。字符串
然后,注册表并运行查询
型
avwztpqn2#
我刚从here找到了答案
基本上只需要设置绑定模式就可以了,但它对
upsert-kafka
不起作用。我的意图是为那些可以浏览数据(kafkatopic)作为服务和分页的用户构建一个表作为服务。它通过
ksqldb
和flinksql
显示不工作