如何使用Kafka在FlinkSQL中运行批处理模式主题

oaxa6hgo  于 2023-11-15  发布在  Apache
关注(0)|答案(2)|浏览(184)

我用FLINK SQL创建了一个记录。

CREATE TABLE en_trans (
  `transid` INTEGER,
  `productname` INTEGER,
   PRIMARY KEY (transid) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'en_trans',
  'properties.bootstrap.servers' = '....:9092',
  'properties.group.id' = 'en_trans_group_test',
  'key.format' = 'avro-confluent',
  'value.format' = 'avro-confluent',
  'key.avro-confluent.url' = 'http://kafka-netlex-cp-schema-registry:8081',
  'value.avro-confluent.url' = 'http://kafka-netlex-cp-schema-registry:8081'
);

个字符
我出错了

org.apache.flink.table.api.ValidationException: Querying an unbounded table 'default_catalog.default_database.en_trans' in batch mode is not allowed. The table source is unbounded.


我不想要流模式,因为我需要提供这个作为休息服务,分页的目的。而不是STREAMING模式。

jtoj6r0c

jtoj6r0c1#

切换到流媒体模式,您需要创建一个StreamTableEnvironment,而不是BatchTableEnvironment

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

字符串
然后,注册表并运行查询

tEnv.executeSql("CREATE TABLE en_trans (...) WITH (...)");
tEnv.sqlQuery("SELECT * FROM en_trans WHERE transid=123").execute().print();

avwztpqn

avwztpqn2#

我刚从here找到了答案
基本上只需要设置绑定模式就可以了,但它对upsert-kafka不起作用。我的意图是为那些可以浏览数据(kafkatopic)作为服务和分页的用户构建一个表作为服务。
它通过ksqldbflinksql显示不工作

相关问题