如何使用apache flink计算同一数据集上的两个和

to94eoyn  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(382)

我有一个简单的流,如果数据是这样的形式:

id | name | eventType | eventTime
----------------------------------
1    A       PLAY        (ts of when the client fired the event)
1    B       IMPRESSION
2    A       CLICK

最终目标是计算eventtype click的事件总数除以按id和name分组的类型impression的eventtype之和,得到一个60秒的滚动窗口。
在纯sql中

SELECT d.id, d.name, d.impressionCount, d.clickCount,  d.clickCount / d.impressionCount * 100.0 FROM
( SELECT i.id, i.name, count(*) as clickCount, c.impressionCount from events as i
LEFT JOIN
 (
    SELECT id, name, count(*) as impressionCount from events WHERE event_type = 'IMPRESSION' GROUP BY id,name
 ) as c
ON i.id = c.id and i.name = c.name
WHERE event_type = 'CLICK' 
 GROUP BY i.id, i.name
) as d

所以我首先需要创建一个包含点击次数的列和一个包含印象次数的新列,然后使用该表进行除法。
我的问题是。。使用flink API最好的方法是什么?我试图这样做:

Table clickCountTable = eventsTable
                .where("eventType = 'CLICK'")
                .window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
                .groupBy("id, name, minuteWindow")
                .select("concat(concat(id,'_'), name) as id, eventType.count as clickCount, minuteWindow.rowtime as minute");

同样的印象,然后我加入这两张table。但我没有得到正确的结果,我不确定这是最好的方式来实现我想做的使用tubling窗口。
编辑:
这就是我如何将流转换为表:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

[.....]
DataStream<EventWithCount> eventStreamWithTime = eventStream
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventWithCount>() {
                @Override
                public long extractAscendingTimestamp(EventWithCount element) {
                    try {
                        DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS");
                        Date parsedDate = df1.parse(element.eventTime);
                        Timestamp timestamp = new java.sql.Timestamp(parsedDate.getTime());
                        return timestamp.getTime();
                    } catch (Exception e) {
                        throw new RuntimeException(e.getMessage());
                    }
                }});
tEnv.fromDataStream(eventStreamWithTime, "id, name, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);
a0zr77ik

a0zr77ik1#

您的表api查询可以计算 CLICK 事件依据 id 以及 name 每分钟看起来不错。

Table clickCountTable = eventsTable
  .where("eventType = 'CLICK'")
  .window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
  .groupBy("id, name, minuteWindow")
  .select("concat(concat(id,'_'), name) as clickId, eventType.count as clickCount, minuteWindow.rowtime as clickMin");

对…也一样 IMPRESSION :

Table impressionCountTable = eventsTable
  .where("eventType = 'IMPRESSION'")
  .window(Tumble.over("1.minute").on("eventTime").as("minuteWindow"))
  .groupBy("id, name, minuteWindow")
  .select("concat(concat(id,'_'), name) as impId, eventType.count as impCount, minuteWindow.rowtime as impMin");

最后,必须联接两个表:

Table result = impressionCountTable
  .leftOuterJoin(clickCountTable, "impId = countId && impMin = countMin")
  .select("impId as id, impMin as minute, clickCount / impCount as ratio")

注意连接条件 impMin = countMin . 这将把连接变成一个最小窗口大小为1毫秒的时间窗口连接(ms是flinksql中的时间粒度)。
您说过,查询的行为与您预期的不符。你能更具体地说明你的预期结果和实际结果吗?

相关问题