Flink 将字段从一个事件类型添加到另一个事件类型

pnwntuvh  于 2023-03-11  发布在  Apache
关注(0)|答案(1)|浏览(145)

在Kafka的作品流中,我有三种类型的事件:

{etype: A, eid: 101, name: Max, my_id:30001}

{etype:B, eid: 101, age:21, my_id:30001}
{etype:C, eid: 101, car:honda, my_id:30005}

所有这些事件都在同一个主题中流动。我想以下面的出口主题结束:

{etype: A, eid: 101, name: Max, my_id:30001}

{etype:B, eid: 101, name: Max, age:21, my_id:30001}
{etype:C, eid: 101, name: Max, car:honda, my_id:30005}

基本上,“name”在一种类型的事件中,而我想得到所有其他事件中的name。我是Flink的新手,但似乎它会是更复杂处理的一个很好的起点。我如何在Java中做上述的事情呢?

huwehgph

huwehgph1#

你可以考虑创建一个管道,用与你关心的名字相关的东西作为键(例如eid),这样可以确保所有流入的事件共享这个标识符,并通过同一个有状态的进程函数(例如KeyedProcessFunction)进行路由。
使用有状态函数可以存储共享同一个eid(即KeyedState)的所有对象的重要状态信息,并且可以执行一些基本的扩充,如下所示(一个简单的示例使用了一些通用的JsonObject,但您可以用自己的类替换它):

// Function that accepts incoming keyed (strings) events 
// (JsonObject) and outputs enriched events (JsonObject)
class EventEnrichmentFunction: KeyedProcessFunction<String, JsonObject, JsonObject>() {

    // Define the state (mapping properties as key-value pairs of strings)
    private lateinit var state: MapState<String, String>

    override fun open(parameters: Configuration) {
        // Initialize the state (just a map of key-value string pairs)
        state = runtimeContext.getMapState(
            MapStateDescriptor("events", TypeInformation.of(String::class.java), TypeInformation.of(String::class.java))
        )
    }

    // Function that accepts an incoming event (JsonObject), 
    // enriches against state, and outputs events (JsonObject)
    override fun processElement(event: JsonObject, ctx: Context, collector: Collector<JsonObject>) {
        if (state.isEmpty) {
            // This is the first time we've seen an event 
            //with this id, so we need to add properties to keep
            // track of what we've seen
            state.put("name", event.name)
        }

        // Check for properties that you want to try and enrich
        if (!event.has("name") && state.containsKey("name")) {
            event.addProperty("name", state.get("name"))
        }

        // Return the enriched event
        collector.collect(event)
    }
}

基本上,任何键控的有状态函数都允许您基于特定键(在本例中为eid)保留信息,以便共享同一个键的未来事件可以访问它前面的那些键的状态,并随着时间的推移继续构建未来的匹配事件:

if (state.isEmpty) {
    // This is the first time we've seen an event with this id, 
    // so we need to add properties to keep track of what we've seen
    state.put("name", event.name)
    state.put("another_property", event.another_property)
    state.put("yet_another_one", event.yet_another_one);

}

// TODO: Add checks and conditionally enrich events as needed from state

当定义你的工作本身时,它可能看起来像这样:

events
   .keyBy { e -> e.get("eid") }
   .process(EventEnrichmentFunction())
   .sinkTo(...)

相关问题