flink sql match\u承认给出了不完整的结果

fv2wmkja  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(185)

我把下面的数据作为一条流给了Flink

ID  Val eventTime.rowtime
266 25  9000
266 22  10000
266 19  11000
266 18  12000
266 16  13000
266 15  14000
266 14  15000
266 13  16000
266 14  17000
266 15  18000
266 17  19000
266 18  20000
266 18  21000
266 19  22000
266 21  23000
266 21  24000
266 21  25000
266 22  26000
266 21  27000
266 21  28000
266 22  29000
266 24  30000
266 23  31000
266 24  32000
266 25  33000
266 24  34000
266 22  35000
266 23  36000
266 24  37000
266 19  38000

我需要运行一个sql匹配,如下所示

Select ID, sts, ets, intervalValue,valueDescription, intvDuration from         
RawEvents Match_Recognize (
PARTITION BY ID
ORDER BY eventTime
MEASURES
A.ID AS id,
FIRST(A.eventTime) As sts,
LAST(A.eventTime) As ets,
MAX(A.val) As intervalValue,
'max' As valueDescription,
TIMESTAMPDIFF(SECOND, FIRST(A.eventTime), LAST(A.eventTime)) As 
intvDuration 
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A as A.val>=20,
B As true)

我希望输出包括如下间隔

(266,1970-01-01 00:00:09.0,1970-01-01 00:00:10.0,25.0,max,1)
(266,1970-01-01 00:00:10.0,1970-01-01 00:00:10.0,22.0,max,0)
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:23.0,22.0,max,0)
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:24.0,22.0,max,0)
...
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:37.0,22.0,max,0)
...
(266,1970-01-01 00:00:37.0,1970-01-01 00:00:37.0,22.0,max,0)

但实际上我只得到了前两张唱片
下面是我将流转换为表并将查询结果返回到流的完整代码

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(10);

DataStream<String> stringStream = env.addSource(new 
LinearRoadSource("C:\\Work\\Data\\linear.csv"));
    DataStream<SpeedEvent> speedStream = stringStream.map(new 
SpeedMapper()).setParallelism(1);
    speedStream = speedStream.assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks<SpeedEvent>() {
        private long maxTimestampSeen = 0;

        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(maxTimestampSeen);
        }

        @Override
        public long extractTimestamp(SpeedEvent temperatureEvent, long l) 
{
            long ts = temperatureEvent.getTimestamp();
           // if (temperatureEvent.getKey().equals("W"))
                maxTimestampSeen = Long.max(maxTimestampSeen,ts);
            return ts;
        }
    }).setParallelism(1);

TupleTypeInfo<Tuple3<String, Double, Long>> inputTupleInfo = new 
TupleTypeInfo<>(
            Types.STRING(),
            Types.DOUBLE(),
            Types.LONG()
    );

StreamTableEnvironment tableEnv = 
StreamTableEnvironment.getTableEnvironment(env);
    tableEnv.registerDataStream("RawEvents",
            keyedStream.map((MapFunction<SpeedEvent, Tuple3<String, 
Double, Long>>) event -> new Tuple3<>(event.getKey(), event.getValue(), 
event.getTimestamp())).returns(inputTupleInfo),
            "ID, val, eventTime.rowtime"
    );

Table intervalResult = tableEnv.sqlQuery("Select ID, sts, ets, intervalValue,valueDescription, intvDuration from         
RawEvents Match_Recognize (
PARTITION BY ID
ORDER BY eventTime
MEASURES
A.ID AS id,
FIRST(A.eventTime) As sts,
LAST(A.eventTime) As ets,
MAX(A.val) As intervalValue,
'max' As valueDescription,
TIMESTAMPDIFF(SECOND, FIRST(A.eventTime), LAST(A.eventTime)) As 
intvDuration 
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A as A.val>=20,
B As true)");

TupleTypeInfo<Tuple6<String, Timestamp, Timestamp, Double, String, 
Integer>> tupleTypeInterval = new TupleTypeInfo<>(
            Types.STRING(),
            Types.SQL_TIMESTAMP(),
            Types.SQL_TIMESTAMP(),
            Types.DOUBLE(),
            Types.STRING(),
            Types.INT()
    );

DataStream<Tuple6<String, Timestamp, Timestamp, Double, String, Integer>> 
queryResultAsStream = tableEnv.toAppendStream(intervalResult,    tupleTypeInterval);
queryResultAsStream.print();

会不会是我做错了什么,或者是我忘了做什么?
我正在使用flink 1.8.1。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题