在kafka使用者之后异步处理消息

zwghvu4y  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(448)

目前,我们正在使用apachenifi通过kafka消费者消费消息。kafka消费者的输出连接到db处理器,db处理器从消费者那里获取队列中的消息,并在其上运行存储的过程/处理。因此,db处理器将在每个from队列中处理一条消息,我可以将db处理器设置为在n个线程中并行工作,但主要是每个线程可以在每个队列中处理一条消息。
我想做如下事情:
一个接一个的处理器只会从队列中消耗消息(或获取消息),并说将等待“批处理”或总共1000条消息。
一旦它获得1000条消息或60秒的传递,并且消息计数<1000,就推送到另一个处理器,该处理器可以被db storage proc用于这些消息组上的业务逻辑。
主要是,我希望上面是多线程的,也就是说,如果我们得到3000条消息,第一个处理器将分3批读取它们,并推送到db处理器(并行)。
所以我想知道有没有这样的处理器可以完成上面第2点的任务,即只读取消息并根据批处理/时间规则将其推送到下一个?

mw3dktmi

mw3dktmi1#

如果您可以利用nifi的记录处理器,那么使用批量大小为1000的consumekafkarecord,然后使用putdatabaserecord,将获得与您所描述的类似的行为。
如果在使用时kafka主题中可能没有足够的可用消息,那么在中间添加mergecontent或mergerecord会让您等待一定的时间或消息数量。

相关问题