在Kafka的作品流中,我有三种类型的事件:
{etype: A, eid: 101, name: Max, my_id:30001}
{etype:B, eid: 101, age:21, my_id:30001}
{etype:C, eid: 101, car:honda, my_id:30005}
所有这些事件都在同一个主题中流动。我想以下面的出口主题结束:
{etype: A, eid: 101, name: Max, my_id:30001}
{etype:B, eid: 101, name: Max, age:21, my_id:30001}
{etype:C, eid: 101, name: Max, car:honda, my_id:30005}
基本上,“name”在一种类型的事件中,而我想得到所有其他事件中的name。我是Flink的新手,但似乎它会是更复杂处理的一个很好的起点。我如何在Java中做上述的事情呢?
1条答案
按热度按时间huwehgph1#
你可以考虑创建一个管道,用与你关心的名字相关的东西作为键(例如
eid
),这样可以确保所有流入的事件共享这个标识符,并通过同一个有状态的进程函数(例如KeyedProcessFunction
)进行路由。使用有状态函数可以存储共享同一个
eid
(即KeyedState
)的所有对象的重要状态信息,并且可以执行一些基本的扩充,如下所示(一个简单的示例使用了一些通用的JsonObject
,但您可以用自己的类替换它):基本上,任何键控的有状态函数都允许您基于特定键(在本例中为
eid
)保留信息,以便共享同一个键的未来事件可以访问它前面的那些键的状态,并随着时间的推移继续构建未来的匹配事件:当定义你的工作本身时,它可能看起来像这样: