我想创建一个 Trigger
它第一次在20秒内发射,之后每5秒发射一次。我用过 GlobalWindows
还有一种习俗 Trigger
```
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
这是密码 `TradeTrigger` :
@PublicEvolving
public class TradeTrigger extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} else {
if((System.currentTimeMillis()-ctime) >= 5000){
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
public static <W extends Window> TradeTrigger<W> of() {
return new TradeTrigger<>();
}
}
所以基本上,什么时候 `flag` 是 `false` ,即第一次 `Trigger` 应该在20秒内被炒鱿鱼 `flag` 至 `true` . 从下一次开始,它应该每5秒被发射一次。
我面临的问题是,每次 `Trigger` 被解雇了。也就是说,我在20秒后收到一条消息,每5秒收到一条消息。我期望每次触发时输出20条消息。
如果我使用 `.timeWindow(Time.seconds(5))` 创建一个5秒的时间窗口,每5秒输出20条消息。请帮我把代码弄好。有什么我不知道的吗?
2条答案
按热度按时间gr8qqesn1#
在费边和Flink邮件列表的帮助下,它成功了。将状态存储在
ValueState
变量通过TriggerContext
. 签入变量onEvent()
方法,如果是第一次注册processingTimeTimer
超过当前时间20秒并更新状态。在onProcessingTime
方法,注册另一个ProcessingTimeTimer
比当前时间多5秒,更新状态并触发Window
.1l5u6lss2#
触发器实现有几个问题:
永远不要将函数的状态存储在静态变量中。flink不会隔离JVM中的用户进程。相反,它为每个taskmanager使用一个jvm,并启动多个线程。因此,静态布尔标志在触发器的多个示例之间共享。你应该把Flink的国旗藏起来
ValueState
可从TriggerContext
. flink会注意检查你的状态,并在失败时恢复它。Trigger.onEvent()
仅在新事件到达时调用。因此它不能用于在特定时间触发窗口计算。相反,您应该注册事件时间计时器或处理时间计时器(再次通过TriggerContext
). 计时器将调用Trigger.onEventTime()
或者Trigger.onProcessingTime()
分别。是使用事件还是处理时间取决于您的用例。