我有这样一条管道:
env.addSource(kafkaConsumer)
.keyBy { value -> value.f0 }
.window(EventTimeSessionWindows.withGap(Time.minutes(2)))
.reduce(::reduceRecord)
.addSink(kafkaProducer)
我想用ttl终止键控数据。
一些博客文章指出我需要一个 ValueStateDescriptor
为了这个。我做了一个这样的:
val desc = ValueStateDescriptor("val state", MyKey::class.java)
desc.enableTimeToLive(ttlConfig)
但是,如何将这个描述符应用到我的管道中,以便它实际执行ttl到期?
1条答案
按热度按时间nnvyjq4y1#
您描述的管道没有使用任何键控状态,这将有助于设置状态ttl。管道中唯一键控的状态是会话窗口的内容,并且该状态将尽快被清除——会话关闭时(此外,由于您使用的是reduce函数,因此该状态只包含每个键的一个值。)
在大多数情况下,过期状态只与显式创建的状态相关,在这种情况下,您可以随时访问状态描述符,并可以将其配置为使用状态ttl。FlinkSQL确实代表您创建了可能不会自动过期的状态,在这种情况下,您需要使用空闲状态保留时间来配置它。cep库还代表您创建状态,在这种情况下,您应该确保您的模式最终匹配或超时。