我有一个简单的流,如果数据是这样的形式:
id | name | eventType | eventTime
----------------------------------
1 A PLAY (ts of when the client fired the event)
1 B IMPRESSION
2 A CLICK
最终目标是计算eventtype click的事件总数除以按id和name分组的类型impression的eventtype之和,得到一个60秒的滚动窗口。
在纯sql中
SELECT d.id, d.name, d.impressionCount, d.clickCount, d.clickCount / d.impressionCount * 100.0 FROM
( SELECT i.id, i.name, count(*) as clickCount, c.impressionCount from events as i
LEFT JOIN
(
SELECT id, name, count(*) as impressionCount from events WHERE event_type = 'IMPRESSION' GROUP BY id,name
) as c
ON i.id = c.id and i.name = c.name
WHERE event_type = 'CLICK'
GROUP BY i.id, i.name
) as d
所以我首先需要创建一个包含点击次数的列和一个包含印象次数的新列,然后使用该表进行除法。
我的问题是。。使用flink API最好的方法是什么?我试图这样做:
Table clickCountTable = eventsTable
.where("eventType = 'CLICK'")
.window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
.groupBy("id, name, minuteWindow")
.select("concat(concat(id,'_'), name) as id, eventType.count as clickCount, minuteWindow.rowtime as minute");
同样的印象,然后我加入这两张table。但我没有得到正确的结果,我不确定这是最好的方式来实现我想做的使用tubling窗口。
编辑:
这就是我如何将流转换为表:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
[.....]
DataStream<EventWithCount> eventStreamWithTime = eventStream
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventWithCount>() {
@Override
public long extractAscendingTimestamp(EventWithCount element) {
try {
DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
Date parsedDate = df1.parse(element.eventTime);
Timestamp timestamp = new java.sql.Timestamp(parsedDate.getTime());
return timestamp.getTime();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}});
tEnv.fromDataStream(eventStreamWithTime, "id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
1条答案
按热度按时间a0zr77ik1#
您的表api查询可以计算
CLICK
事件依据id
以及name
每分钟看起来不错。对…也一样
IMPRESSION
:最后,必须联接两个表:
注意连接条件
impMin = countMin
. 这将把连接变成一个最小窗口大小为1毫秒的时间窗口连接(ms是flinksql中的时间粒度)。您说过,查询的行为与您预期的不符。你能更具体地说明你的预期结果和实际结果吗?