我有一个flink应用程序,它处理数据流并将一些结果写入数据库。数据流是按id键控的。一个数据库操作可能需要相当长的时间(例如3分钟),并且只能是指定id键的一个操作,以防止锁定。此时,此接收操作无法使用parallell处理,必须将parallelism设置为1。
process
.keyBy(new ProductKeySelector())
.addSink(new ProductSink())
.setParallelism(1)
我想用实际的处理id事件锁流,并采取另一个,无序,并等待,直到相同的id结束进程,然后运行进程到它。这将是像阻塞队列的进程。
更新:
示例:
kafkaKeyedStream
.map(new MapToProductType())
.keyBy(new ProductKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ProductAggregateFunction())
.addSink(new ProductSink());
从Kafka来源我收到的数据:
enter image description here
如您所见,数据按窗口函数分组(数据中的第一个值是关键字),结果由接收函数处理。在这个例子中,假设处理每个数据部分需要20秒。因此,如果我有1个线程,这不是问题,因为下一个数据等待处理,但如果我设置并行度= 2,则第一部分数据仍将由一个线程处理。10秒后,另一个线程启动,用与第一个相同的密钥处理下一部分数据,这就在数据库上创建了一个锁。
我希望在一个线程已经在处理特定键下的数据的情况下,第二个线程不获取同一个键上的数据,而是获取不同的键,或者在没有其他键的情况下不执行任何操作
1条答案
按热度按时间unhi4e5o1#
如果你的数据库操作需要3分钟,你不想使用普通的JDBC接收器,而是看看Flink的Async IO支持,你可能想使用
keyBy(id)
,然后在定制的RichAsyncFunction
操作符中,你可以跟踪是否有一个给定id的活动数据库请求。