flink cep不是确定性的

gcxthw6b  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(297)

我在没有群集的情况下在本地运行以下代码:

val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
  println("Found: " + (patternName, eventMap))
  count.incrementAndGet()
})

env.execute()
println(count)

我的数据是以下格式的csv文件(user,val):

1,1
1,2
1,3
2,1
2,2
2,3
...

我正试图检测出 event(val=1) -> event(val=2) -> event(val=3) . 当我在一个大的输入流上运行这个程序时,我知道流中存在一定数量的事件,我检测到的事件数不一致,几乎总是小于系统中的事件数。如果我这样做了 env.setParallelism(1) (就像我在代码第3行中所做的那样),所有事件都被检测到。
我假设问题是当并行度大于1时,多个线程正在处理流中的事件,这意味着当一个线程 event(val=1) -> event(val=2) , event(val=3) 可能被发送到另一个线程,而整个模式可能不会被检测到。
有什么我不知道的吗?我不能丢失流中的任何模式,但将parallelism设置为1似乎无法实现像flink这样的系统检测事件的目的。
更新:
我尝试使用以下方法设置流的键:

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))

尽管这样可以防止不同用户的事件相互干扰:

1,1
2,2
1,3

这并不能阻止flink将事件无序地发送到节点,这意味着非确定性仍然存在。

svdrlsy4

svdrlsy41#

您是否考虑过使用userid(您的第一个值)为流设置密钥?flink保证一个键的所有事件都到达同一个处理节点。当然,这只会有帮助,如果您想检测每个用户的val=1->val=2->val=3的模式。

dz6r00yl

dz6r00yl2#

最可能的问题在于在map操作符之后应用keyby操作符。
所以,不是:

val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))

应该有:

val mapped: KeyedStream[Map[String, Any]] = text.keyBy((m) => m.get("user")).map(...)

我知道这是个老问题,但也许能帮上忙。

相关问题