egg 关于多worker下消息队列重复消费 这样解决的方式对吗 或者说还有没有更好的解决方案

o2g1uqev  于 2022-10-27  发布在  其他
关注(0)|答案(2)|浏览(297)

因为在生成环境中会根据cpu分配多个进程,导致会出现重复消费的情况,阅读了一下官网的文档 发现有一个 agent.messenger.sendRandom() 方法 可以随机选择一个去执行; 所以我用来下面这样的方法去实现
也确实解决了重复消费的问题 , 但是我看官网上说:Worker 运行的是业务代码,相对会比 Agent 和 Master 进程上运行的代码复杂度更高,稳定性也低一点,当 Worker 进程异常退出时,Master 进程会重启一个 Worker 进程。
这会不会意味着刚好agent选中的那个进程还没有执行消费就挂掉了 然后重启了肯定又是一个新进程,或者还会不会有别的可能出现隐患的可能性(我是小白 可能还有别的可能性没遇到过,所以也很难想到,如果有请大佬补充一下);
然后就是暂时不考虑集群的话 除了这样做还有没有更好的解决方案 谢谢

//agent.js
module.exports = agent => {
    agent.messenger.on('egg-ready', () => {
        const ctx = agent.createAnonymousContext();
        const client = new kafka.KafkaClient({ kafkaHost: ctx.app.config.kafkaHost });
        const consumer = new kafka.Consumer(client, ctx.app.config.consumerTopics, {
            autoCommit: false,
        });
        agent.beforeStart(async function () {
            consumer.on('message', async function (message) {
                agent.messenger.sendRandom('kafka', message)
            });
        });
    });
};
//app.js
app.producer = producer;
   const consumer = new kafka.Consumer(client, ctx.app.config.consumerTopics, {
     autoCommit: false,
   });
   app.messenger.on('kafka', async message => {
     const nameMQ = message.topic
     console.log('nameMQ: ', nameMQ);
       await ctx.service.app.mq[nameMQ](JSON.parse(message.value));
       consumer.commit(true, (err, data) => {
         console.error('commit:', err, data);
       });
   });
fwzugrvs

fwzugrvs1#

这是肯定的,实际上框架只能保证 sendRandom 有 worker 分配到,但是无法保证 worker 处理完成,这一层需要应用逻辑去保证。

hiz5n14c

hiz5n14c2#

谢谢大佬的回复 这个我又用redis做了下处理;那就是除了我说的这种情况还有别可能出现的可能性吗 或者还有没有更好的解决方案

这是肯定的,实际上框架只能保证 sendRandom 有 worker 分配到,但是无法保证 worker 处理完成,这一层需要应用逻辑去保证。

相关问题