public class dws_user_profile_count_type {
public static void main(String[] args) throws Exception {
//TODO 1.获取执行环境
Configuration conf = new Configuration();
conf.setBoolean("table.exec.non-temporal-sort.enabled", true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. 状态后端设置
// env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// env.getCheckpointConfig().enableExternalizedCheckpoints(
// CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// );
// env.setRestartStrategy(RestartStrategies.failureRateRestart(
// 3, Time.days(1), Time.minutes(1)
// ));
// env.setStateBackend(new HashMapStateBackend());
// env.getCheckpointConfig().setCheckpointStorage(
// "hdfs://master/ck"
// );
// System.setProperty("HADOOP_USER_NAME", "root");
//TODO 3.临时表a
//1.使用flink连接器消费dwd_traffic_page_log,建表
tableEnv.executeSql("CREATE TABLE dwd_traffic_page_log(" +
" `common` MAP<STRING,STRING>, " +
" `page` MAP<STRING,STRING>, " +
" `ts_str` STRING " +
// " `pt` AS PROCTIME() " +
")" +
MyKafkaUtil.getKafkaDDL("dwd_traffic_page_log", "active"));
//2.对查询需要的字段做成临时视图traffic_page
Table result = tableEnv.sqlQuery("select " +
"`common`['uid'] uid, " +
"`page`['during_time'] during_time, " +
"`page`['last_page_id'] last_page_id, " +
"TO_DATE(FROM_UNIXTIME(CAST(`ts_str` AS BIGINT) / 1000, 'yyyy-MM-dd')) `happen_date`, " +
"TO_TIMESTAMP(FROM_UNIXTIME(CAST(`ts_str` AS BIGINT)/1000, 'yyyy-MM-dd HH:mm:ss')) ts " +
// " `pt` " +
"from dwd_traffic_page_log");
tableEnv.createTemporaryView("traffic_page", result);
// tableEnv.sqlQuery("select TYPEOF(ts) from traffic_page").execute().print();
//3.在traffic_page1中筛选近30天的数据 创建临时表tmp_traffic_page
//创建临时表
tableEnv.executeSql("CREATE TEMPORARY TABLE tmp_traffic_page( " +
" uid STRING, " +
" during_time BIGINT, " +
" last_page_id STRING, " +
" happen_date DATE, " +
" `ts` TIMESTAMP(3), " +
" WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND)" +
MyKafkaUtil.getKafkaDDL("tmp_traffic_page","tmp"));
tableEnv.sqlQuery("SELECT " +
" uid, " +
" CAST(during_time AS BIGINT) during_time, " +
" last_page_id, " +
" happen_date, " +
" `ts` " +
"FROM traffic_page " +
"WHERE `happen_date` >= CURRENT_DATE - INTERVAL '30' DAY " +
"ORDER BY uid, `ts` "
).executeInsert("tmp_traffic_page");
//
//4.对临时表tmp_traffic_page进行DENSE_RANK()开窗,用户id分区、ts排序
tableEnv.executeSql("CREATE TEMPORARY TABLE tmp_traffic_page2( " +
" uid STRING, " +
" during_time BIGINT, " +
" last_page_id STRING, " +
" happen_date DATE, " +
" `ts` TIMESTAMP(3), " +
" `rank` BIGINT " +
" ) " +
MyKafkaUtil.getKafkaDDL("tmp_traffic_page2","tmp2"));
tableEnv.sqlQuery("SELECT " +
"uid, " +
"during_time, " +
"last_page_id, " +
"happen_date, " +
"`ts`, " +
"DENSE_RANK() OVER (PARTITION BY uid ORDER BY `ts` ASC) AS `rank` " +
"FROM tmp_traffic_page " +
"ORDER BY happen , `ts` ASC"
).executeInsert("tmp_traffic_page2");
}
}
请帮我检查我的代码,看看是否有任何问题。我设置了“(“table. exec. non temporary sort. enabled“,true)",但它不起作用,我使用的是Flink版本1.16.0
这个问题已经困扰我很多天了,目前我正在我的IDEA上测试,但即使报告了这个错误,chatAi也无法帮助我解决这个问题。
以下是错误消息
Exception in thread "main" org.apache.flink.table.api.TableException: **Sort on a non-time-attribute field is not supported.**
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSort.translateToPlanInternal(StreamExecSort.java:75)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1733)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:825)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
at org.apache.flink.table.api.Table.executeInsert(Table.java:1064)
at com.tancong.app.dws.dws_user_profile_count_type.main(dws_user_profile_count_type.java:73)
Process finished with exit code 1
我需要对uid进行分组并对ts进行排序以获得排名。TS是一个时间戳,但它可能不是
1条答案
按热度按时间o2gm4chl1#
在流式执行模式下不可能进行非时态排序。这种排序是完全难以处理的--输入必须永远被缓冲。也许你想在批处理模式下执行这个作业?
选项
table.exec.non-temporal-sort
不存在(它没有出现在源代码中的任何地方)。我相信错误来自
ORDER BY happen
。我不知道happen
是在哪里定义的,但它可能不是一个时间属性。