apache flink会话支持

mwngjboj  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(299)

我正在调查apache flink流媒体将用于我们的etl和机器学习平台。我还没有弄清楚的是如何将事件流化为“会话”。更具描述性:所有事件都包含会话id,为了丰富数据,我需要将属于一个会话的所有事件分组在一起。请考虑到事件是连续流入的(因此没有批处理支持,在此之后您可以简单地执行groupby,例如)
一种可能的解决方案是维护会话的lru缓存,并将所有传入事件排序到其关联的会话。然后,在每个会话x分钟不活动之后,可以“关闭”会话或从缓存中逐出会话。问题是如何在多租户系统中处理这个缓存;flink是否有分布式缓存的概念,或者它是否包含某种智能负载均衡器,事件被定向到网格中的同一分区?
更一般地说:使用流式api建立会话支持的最佳方法(用例和陷阱)是什么?这有可能吗?如何处理重放流(i、 e.从一个特定的时间点开始,在这个时间点上,不完整的会话(即时间点之前的事件)的事件流
对任何反馈、想法和/或建议感兴趣。
提前谢谢

4nkexdtk

4nkexdtk1#

我创建了一个非常接近您要求的示例:https://gist.github.com/aljoscha/91b6422114eac814479f
我用一个 Tuple2<Integer,String> 模拟数据。整数是会话id,而字符串是我们对数据进行键控(分区)的某个字段。
我建议你先看看 main() 方法,您可以看到程序的流程。其他位是自定义窗口定义 SessionWindow 窗口分配者和 SessionTrigger . 这基本上实现了您建议的缓存思想。根据分配的窗口和密钥,窗口保存在缓冲区中。一旦触发,我们处理窗口,内容被逐出。
当触发器接收到一个元素时,它会在未来10秒钟内注册一个计时器。如果此时没有新元素到达,触发器将触发。如果新元素在该时间窗口内到达,它将注册一个新的计时器,这将替换旧的计时器,因为触发器一次只能有一个活动计时器。
此外,这使用了所谓的处理时间窗口。这也可以更改为基于事件时间的触发,即元素的时间戳。

a14dhokn

a14dhokn2#

会话可以通过以下方式从事件流中提取: EventTimeSessionWindows . 它将一个接一个出现的所有事件合并形成会话窗口,直到它们之间的间隔大于指定的值。如果流包含许多会话(可以通过 sessionId 在每种情况下),则应首先按会话id分组,以便为每个会话分别设置会话窗口。
在下面的代码示例中

case class Event(
    createdat: Timestamp,
    session: String
)

转化为

case class SessionEvent(
    sessionId: String,
    start: Instant,
    end: Instant,
    `type`: String
)
``` `SessionEvent` 在过去20分钟内没有任何事件时发出( `sessionTimeout` ).

// Apache Flink 1.1.4 with Kafka connector

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, FlinkKafkaProducer09}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector

object Main extends App {

val sessionTimeout = Time minutes 20
val kafkaCluster = "localhost:9092"
val inputTopic = "events"
val outputTopic = "sessions"

val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties
properties.setProperty("bootstrap.servers", kafkaCluster)
properties.setProperty("group.id", "sessions")

val consumer = new FlinkKafkaConsumer09[String](inputTopic, new SimpleStringSchema, properties)
val producer = new FlinkKafkaProducer09[String](kafkaCluster, outputTopic, new SimpleStringSchema)

val stream =
env
.addSource(consumer)
.map(Formats.readEvent )
.keyBy(
.session)
.window(ProcessingTimeSessionWindows withGap sessionTimeout)
.apply[SessionEvent] {
(key: String, window: TimeWindow, values: Iterable[Event], out: Collector[SessionEvent]) ⇒
val session =
SessionEvent(
key,
values.head.createdat.toInstant,
values.last.createdat.toInstant,
"end"
)
out.collect(session)
}
.map(Formats.writeSessionEvent _)
.addSink(producer)

env.execute("sessions")
}

相关问题