flink'timewindow'操作没有为来自kafka示例文件的popularplacesfrom生成输出

iih3973s  于 2021-06-25  发布在  Flink
关注(0)|答案(3)|浏览(258)

我正在浏览来自dataartisans的flink教程资料,由于某种原因,当我访问示例文件popularplacesfromkafka.scala时,我没有收到任何发送到stdout的输出。

...
// find popular places
val popularSpots = rides
  // match ride to grid cell and event type (start or end)
  .map(new GridCellMatcher)
  // partition by cell id and event type
  .keyBy( k => k )
  // build sliding window
  .timeWindow(Time.minutes(15), Time.minutes(5))
  // count events in window
  .apply{ (key: (Int, Boolean), window, vals, out: Collector[(Int, Long, Boolean, Int)]) =>
    out.collect( (key._1, window.getEnd, key._2, vals.size) )
  }

// print result on stdout
    popularSpots.print()
...

我已经确认数据是从kafka中提取出来的,当它试图执行“timewindow”操作时,我似乎没有得到任何输出。如果我删除'timewindow'操作,我可以看到'keyby'数据被输出。我有什么明显的遗漏吗?

ne5o7dgx

ne5o7dgx1#

一般来说,flink作业可能不产生任何输出有几个原因,但一个非常常见的原因与水印有关。flink的事件时钟只在当前水印前进时前进,因此如果没有水印,事件时间窗口将永远不会启动。
在Flink训练演习的情况下,出租车乘坐来源产生水印为您。但是现在您使用的是kafka源代码,您必须实现一个时间戳提取器和水印生成器,然后调用 assignTimestampsAndWatermarks 在您的流上(请参阅文档)。一 BoundedOutOfOrdernessTimestampExtractor 如果延迟与写入kafka的作业配置的延迟相匹配,则可以正常工作。

rnmwe5a2

rnmwe5a22#

是否为源配置了适当的加速?默认情况下(没有加速因子),源模拟原始数据,也就是说,它以与原始数据相同的速率发出记录。这意味着产生1分钟的数据需要1分钟。
窗口操作符每5分钟聚合最后15分钟的数据。因此,窗口操作符生成第一个结果需要5分钟。
如果你把加速因子设置为600,你将在1秒内得到10分钟的数据。

fwzugrvs

fwzugrvs3#

万一有人有同样的问题这是我的问题。
我的kafka主题有多个分区,但是正在将所有测试数据生成到一个分区(0),一旦我有>1个kafka使用者,除了分配给分区0的使用者之外,所有使用者都不会接收任何数据,因此不会在操作符链上发送任何水印-这会导致窗口函数停止发送数据(这也是为什么在这些情况下处理时间可以正常工作的原因)。以下是一个相关的jira:
https://issues.apache.org/jira/browse/flink-5479

相关问题