当我使用Flink SQL时,我调用dense_rank(),我确信它是时态的,但它仍然报告“不支持对非时间属性字段进行排序”,

3htmauhk  于 2023-04-18  发布在  Apache
关注(0)|答案(1)|浏览(148)
public class dws_user_profile_count_type {
    public static void main(String[] args) throws Exception {
        //TODO 1.获取执行环境
        Configuration conf = new Configuration();
        conf.setBoolean("table.exec.non-temporal-sort.enabled", true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // TODO 2. 状态后端设置
//        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(
//                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
//        );
//        env.setRestartStrategy(RestartStrategies.failureRateRestart(
//                3, Time.days(1), Time.minutes(1)
//        ));
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage(
//                "hdfs://master/ck"
//        );
//        System.setProperty("HADOOP_USER_NAME", "root");
        //TODO 3.临时表a
        //1.使用flink连接器消费dwd_traffic_page_log,建表
        tableEnv.executeSql("CREATE TABLE dwd_traffic_page_log(" +
                "  `common` MAP<STRING,STRING>,  " +
                "  `page` MAP<STRING,STRING>,  " +
                "  `ts_str` STRING " +
//                "   `pt` AS PROCTIME()  " +
                ")" +
                MyKafkaUtil.getKafkaDDL("dwd_traffic_page_log", "active"));
        //2.对查询需要的字段做成临时视图traffic_page
        Table result = tableEnv.sqlQuery("select " +
                "`common`['uid'] uid, " +
                "`page`['during_time'] during_time, " +
                "`page`['last_page_id'] last_page_id, " +
                "TO_DATE(FROM_UNIXTIME(CAST(`ts_str` AS BIGINT) / 1000, 'yyyy-MM-dd')) `happen_date`, " +
                "TO_TIMESTAMP(FROM_UNIXTIME(CAST(`ts_str` AS BIGINT)/1000, 'yyyy-MM-dd HH:mm:ss')) ts " +
//                " `pt` " +
                "from dwd_traffic_page_log");
        tableEnv.createTemporaryView("traffic_page", result);
//        tableEnv.sqlQuery("select TYPEOF(ts) from traffic_page").execute().print();
        //3.在traffic_page1中筛选近30天的数据          创建临时表tmp_traffic_page
        //创建临时表
        tableEnv.executeSql("CREATE TEMPORARY TABLE tmp_traffic_page(   " +
                "   uid STRING,   " +
                "   during_time BIGINT,   " +
                "   last_page_id STRING,   " +
                "   happen_date DATE,   " +
                "   `ts` TIMESTAMP(3), " +
                "   WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND)" +
                MyKafkaUtil.getKafkaDDL("tmp_traffic_page","tmp"));
        tableEnv.sqlQuery("SELECT   " +
                " uid,  " +
                " CAST(during_time AS BIGINT) during_time,  " +
                " last_page_id,  " +
                " happen_date,  " +
                " `ts`  " +
                "FROM traffic_page   " +
                "WHERE `happen_date` >= CURRENT_DATE - INTERVAL '30' DAY " +
                "ORDER BY uid, `ts` "
        ).executeInsert("tmp_traffic_page");
//
        //4.对临时表tmp_traffic_page进行DENSE_RANK()开窗,用户id分区、ts排序
        tableEnv.executeSql("CREATE TEMPORARY TABLE tmp_traffic_page2(  " +
                "   uid STRING,   " +
                "   during_time BIGINT,   " +
                "   last_page_id STRING,   " +
                "   happen_date DATE,  " +
                "   `ts` TIMESTAMP(3), " +
                "   `rank` BIGINT " +
                "   )  " +
                MyKafkaUtil.getKafkaDDL("tmp_traffic_page2","tmp2"));
        tableEnv.sqlQuery("SELECT " +
                "uid, " +
                "during_time, " +
                "last_page_id, " +
                "happen_date, " +
                "`ts`, " +
                "DENSE_RANK() OVER (PARTITION BY uid ORDER BY `ts` ASC) AS `rank` " +
                "FROM tmp_traffic_page " +
                "ORDER BY happen , `ts` ASC"
        ).executeInsert("tmp_traffic_page2");
    }
}

请帮我检查我的代码,看看是否有任何问题。我设置了“(“table. exec. non temporary sort. enabled“,true)",但它不起作用,我使用的是Flink版本1.16.0
这个问题已经困扰我很多天了,目前我正在我的IDEA上测试,但即使报告了这个错误,chatAi也无法帮助我解决这个问题。
以下是错误消息

Exception in thread "main" org.apache.flink.table.api.TableException: **Sort on a non-time-attribute field is not supported.**
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.java:75)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
    at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1733)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:825)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
    at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
    at org.apache.flink.table.api.Table.executeInsert(Table.java:1064)
    at com.tancong.app.dws.dws_user_profile_count_type.main(dws_user_profile_count_type.java:73)

Process finished with exit code 1

我需要对uid进行分组并对ts进行排序以获得排名。TS是一个时间戳,但它可能不是

o2gm4chl

o2gm4chl1#

在流式执行模式下不可能进行非时态排序。这种排序是完全难以处理的--输入必须永远被缓冲。也许你想在批处理模式下执行这个作业?
选项table.exec.non-temporal-sort不存在(它没有出现在源代码中的任何地方)。
我相信错误来自ORDER BY happen。我不知道happen是在哪里定义的,但它可能不是一个时间属性。

相关问题