我正在调查apache flink流媒体将用于我们的etl和机器学习平台。我还没有弄清楚的是如何将事件流化为“会话”。更具描述性:所有事件都包含会话id,为了丰富数据,我需要将属于一个会话的所有事件分组在一起。请考虑到事件是连续流入的(因此没有批处理支持,在此之后您可以简单地执行groupby,例如)
一种可能的解决方案是维护会话的lru缓存,并将所有传入事件排序到其关联的会话。然后,在每个会话x分钟不活动之后,可以“关闭”会话或从缓存中逐出会话。问题是如何在多租户系统中处理这个缓存;flink是否有分布式缓存的概念,或者它是否包含某种智能负载均衡器,事件被定向到网格中的同一分区?
更一般地说:使用流式api建立会话支持的最佳方法(用例和陷阱)是什么?这有可能吗?如何处理重放流(i、 e.从一个特定的时间点开始,在这个时间点上,不完整的会话(即时间点之前的事件)的事件流
对任何反馈、想法和/或建议感兴趣。
提前谢谢
2条答案
按热度按时间4nkexdtk1#
我创建了一个非常接近您要求的示例:https://gist.github.com/aljoscha/91b6422114eac814479f
我用一个
Tuple2<Integer,String>
模拟数据。整数是会话id,而字符串是我们对数据进行键控(分区)的某个字段。我建议你先看看
main()
方法,您可以看到程序的流程。其他位是自定义窗口定义SessionWindow
窗口分配者和SessionTrigger
. 这基本上实现了您建议的缓存思想。根据分配的窗口和密钥,窗口保存在缓冲区中。一旦触发,我们处理窗口,内容被逐出。当触发器接收到一个元素时,它会在未来10秒钟内注册一个计时器。如果此时没有新元素到达,触发器将触发。如果新元素在该时间窗口内到达,它将注册一个新的计时器,这将替换旧的计时器,因为触发器一次只能有一个活动计时器。
此外,这使用了所谓的处理时间窗口。这也可以更改为基于事件时间的触发,即元素的时间戳。
a14dhokn2#
会话可以通过以下方式从事件流中提取:
EventTimeSessionWindows
. 它将一个接一个出现的所有事件合并形成会话窗口,直到它们之间的间隔大于指定的值。如果流包含许多会话(可以通过sessionId
在每种情况下),则应首先按会话id分组,以便为每个会话分别设置会话窗口。在下面的代码示例中
转化为
// Apache Flink 1.1.4 with Kafka connector
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, FlinkKafkaProducer09}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
object Main extends App {
val sessionTimeout = Time minutes 20
val kafkaCluster = "localhost:9092"
val inputTopic = "events"
val outputTopic = "sessions"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties
properties.setProperty("bootstrap.servers", kafkaCluster)
properties.setProperty("group.id", "sessions")
val consumer = new FlinkKafkaConsumer09[String](inputTopic, new SimpleStringSchema, properties)
val producer = new FlinkKafkaProducer09[String](kafkaCluster, outputTopic, new SimpleStringSchema)
val stream =
env
.addSource(consumer)
.map(Formats.readEvent )
.keyBy(.session)
.window(ProcessingTimeSessionWindows withGap sessionTimeout)
.apply[SessionEvent] {
(key: String, window: TimeWindow, values: Iterable[Event], out: Collector[SessionEvent]) ⇒
val session =
SessionEvent(
key,
values.head.createdat.toInstant,
values.last.createdat.toInstant,
"end"
)
out.collect(session)
}
.map(Formats.writeSessionEvent _)
.addSink(producer)
env.execute("sessions")
}