同一时刻同一id的Flink块处理事件

yhuiod9q  于 2022-12-31  发布在  Apache
关注(0)|答案(1)|浏览(146)

我有一个flink应用程序,它处理数据流并将一些结果写入数据库。数据流是按id键控的。一个数据库操作可能需要相当长的时间(例如3分钟),并且只能是指定id键的一个操作,以防止锁定。此时,此接收操作无法使用parallell处理,必须将parallelism设置为1。

process
        .keyBy(new ProductKeySelector())
        .addSink(new ProductSink())
        .setParallelism(1)

我想用实际的处理id事件锁流,并采取另一个,无序,并等待,直到相同的id结束进程,然后运行进程到它。这将是像阻塞队列的进程。
更新:
示例:

kafkaKeyedStream
            .map(new MapToProductType())
            .keyBy(new ProductKeySelector())
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .reduce(new ProductAggregateFunction())
            .addSink(new ProductSink());

从Kafka来源我收到的数据:
enter image description here
如您所见,数据按窗口函数分组(数据中的第一个值是关键字),结果由接收函数处理。在这个例子中,假设处理每个数据部分需要20秒。因此,如果我有1个线程,这不是问题,因为下一个数据等待处理,但如果我设置并行度= 2,则第一部分数据仍将由一个线程处理。10秒后,另一个线程启动,用与第一个相同的密钥处理下一部分数据,这就在数据库上创建了一个锁。
我希望在一个线程已经在处理特定键下的数据的情况下,第二个线程不获取同一个键上的数据,而是获取不同的键,或者在没有其他键的情况下不执行任何操作

unhi4e5o

unhi4e5o1#

如果你的数据库操作需要3分钟,你不想使用普通的JDBC接收器,而是看看Flink的Async IO支持,你可能想使用keyBy(id),然后在定制的RichAsyncFunction操作符中,你可以跟踪是否有一个给定id的活动数据库请求。

相关问题