我把下面的数据作为一条流给了Flink
ID Val eventTime.rowtime
266 25 9000
266 22 10000
266 19 11000
266 18 12000
266 16 13000
266 15 14000
266 14 15000
266 13 16000
266 14 17000
266 15 18000
266 17 19000
266 18 20000
266 18 21000
266 19 22000
266 21 23000
266 21 24000
266 21 25000
266 22 26000
266 21 27000
266 21 28000
266 22 29000
266 24 30000
266 23 31000
266 24 32000
266 25 33000
266 24 34000
266 22 35000
266 23 36000
266 24 37000
266 19 38000
我需要运行一个sql匹配,如下所示
Select ID, sts, ets, intervalValue,valueDescription, intvDuration from
RawEvents Match_Recognize (
PARTITION BY ID
ORDER BY eventTime
MEASURES
A.ID AS id,
FIRST(A.eventTime) As sts,
LAST(A.eventTime) As ets,
MAX(A.val) As intervalValue,
'max' As valueDescription,
TIMESTAMPDIFF(SECOND, FIRST(A.eventTime), LAST(A.eventTime)) As
intvDuration
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A as A.val>=20,
B As true)
我希望输出包括如下间隔
(266,1970-01-01 00:00:09.0,1970-01-01 00:00:10.0,25.0,max,1)
(266,1970-01-01 00:00:10.0,1970-01-01 00:00:10.0,22.0,max,0)
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:23.0,22.0,max,0)
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:24.0,22.0,max,0)
...
(266,1970-01-01 00:00:23.0,1970-01-01 00:00:37.0,22.0,max,0)
...
(266,1970-01-01 00:00:37.0,1970-01-01 00:00:37.0,22.0,max,0)
但实际上我只得到了前两张唱片
下面是我将流转换为表并将查询结果返回到流的完整代码
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(10);
DataStream<String> stringStream = env.addSource(new
LinearRoadSource("C:\\Work\\Data\\linear.csv"));
DataStream<SpeedEvent> speedStream = stringStream.map(new
SpeedMapper()).setParallelism(1);
speedStream = speedStream.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<SpeedEvent>() {
private long maxTimestampSeen = 0;
@Override
public Watermark getCurrentWatermark() {
return new Watermark(maxTimestampSeen);
}
@Override
public long extractTimestamp(SpeedEvent temperatureEvent, long l)
{
long ts = temperatureEvent.getTimestamp();
// if (temperatureEvent.getKey().equals("W"))
maxTimestampSeen = Long.max(maxTimestampSeen,ts);
return ts;
}
}).setParallelism(1);
TupleTypeInfo<Tuple3<String, Double, Long>> inputTupleInfo = new
TupleTypeInfo<>(
Types.STRING(),
Types.DOUBLE(),
Types.LONG()
);
StreamTableEnvironment tableEnv =
StreamTableEnvironment.getTableEnvironment(env);
tableEnv.registerDataStream("RawEvents",
keyedStream.map((MapFunction<SpeedEvent, Tuple3<String,
Double, Long>>) event -> new Tuple3<>(event.getKey(), event.getValue(),
event.getTimestamp())).returns(inputTupleInfo),
"ID, val, eventTime.rowtime"
);
Table intervalResult = tableEnv.sqlQuery("Select ID, sts, ets, intervalValue,valueDescription, intvDuration from
RawEvents Match_Recognize (
PARTITION BY ID
ORDER BY eventTime
MEASURES
A.ID AS id,
FIRST(A.eventTime) As sts,
LAST(A.eventTime) As ets,
MAX(A.val) As intervalValue,
'max' As valueDescription,
TIMESTAMPDIFF(SECOND, FIRST(A.eventTime), LAST(A.eventTime)) As
intvDuration
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A+ B)
DEFINE
A as A.val>=20,
B As true)");
TupleTypeInfo<Tuple6<String, Timestamp, Timestamp, Double, String,
Integer>> tupleTypeInterval = new TupleTypeInfo<>(
Types.STRING(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.DOUBLE(),
Types.STRING(),
Types.INT()
);
DataStream<Tuple6<String, Timestamp, Timestamp, Double, String, Integer>>
queryResultAsStream = tableEnv.toAppendStream(intervalResult, tupleTypeInterval);
queryResultAsStream.print();
会不会是我做错了什么,或者是我忘了做什么?
我正在使用flink 1.8.1。
暂无答案!
目前还没有任何答案,快来回答吧!