flink表api:通过JavaDSL和sql创建表?

mwkjh3gx  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(278)

似乎“连接器表”和一个连接到外部系统(如kafka或jdbc)的表需要使用sqlapi,而不支持javadsl,以及 fromValues 简单的测试表需要使用javadslapi,不支持sqlapi。是这样吗?
下面通过javadsl创建一个包含几个简单测试行的flink表。这对于原型设计非常有用。我可以通过sql语法来实现吗?

Table sourceTable = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("field_to_use_as_lookup_key", DataTypes.STRING()),
                        DataTypes.FIELD("extra_field", DataTypes.INT())
                ),
                Expressions.row("key_a", 1),
                Expressions.row("key_c", 2),
                Expressions.row("key_b", 3),
                Expressions.row("key_a", 4)
        );

上面的javadsl是否支持添加 proctime 以下文档中类似此sql示例的列:

CREATE TABLE MyUserTable (
  -- declare the schema of the table
  `user` BIGINT,
  `message` STRING,
  `rowtime` TIMESTAMP(3) METADATA FROM 'timestamp',    -- use a metadata column to access Kafka's record timestamp
  `proctime AS PROCTIME(),    -- use a computed column to define a proctime attribute
  WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '5' SECOND    -- use a WATERMARK statement to define a rowtime attribute
) WITH (
  -- declare the external system to connect to
  'connector' = 'kafka',
  'topic' = 'topic_name',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'   -- declare a format for this system
)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题