因为在生成环境中会根据cpu分配多个进程,导致会出现重复消费的情况,阅读了一下官网的文档 发现有一个 agent.messenger.sendRandom()
方法 可以随机选择一个去执行; 所以我用来下面这样的方法去实现
也确实解决了重复消费的问题 , 但是我看官网上说:Worker 运行的是业务代码,相对会比 Agent 和 Master 进程上运行的代码复杂度更高,稳定性也低一点,当 Worker 进程异常退出时,Master 进程会重启一个 Worker 进程。
这会不会意味着刚好agent选中的那个进程还没有执行消费就挂掉了 然后重启了肯定又是一个新进程,或者还会不会有别的可能出现隐患的可能性(我是小白 可能还有别的可能性没遇到过,所以也很难想到,如果有请大佬补充一下);
然后就是暂时不考虑集群的话 除了这样做还有没有更好的解决方案 谢谢
//agent.js
module.exports = agent => {
agent.messenger.on('egg-ready', () => {
const ctx = agent.createAnonymousContext();
const client = new kafka.KafkaClient({ kafkaHost: ctx.app.config.kafkaHost });
const consumer = new kafka.Consumer(client, ctx.app.config.consumerTopics, {
autoCommit: false,
});
agent.beforeStart(async function () {
consumer.on('message', async function (message) {
agent.messenger.sendRandom('kafka', message)
});
});
});
};
//app.js
app.producer = producer;
const consumer = new kafka.Consumer(client, ctx.app.config.consumerTopics, {
autoCommit: false,
});
app.messenger.on('kafka', async message => {
const nameMQ = message.topic
console.log('nameMQ: ', nameMQ);
await ctx.service.app.mq[nameMQ](JSON.parse(message.value));
consumer.commit(true, (err, data) => {
console.error('commit:', err, data);
});
});
2条答案
按热度按时间fwzugrvs1#
这是肯定的,实际上框架只能保证 sendRandom 有 worker 分配到,但是无法保证 worker 处理完成,这一层需要应用逻辑去保证。
hiz5n14c2#
谢谢大佬的回复 这个我又用redis做了下处理;那就是除了我说的这种情况还有别可能出现的可能性吗 或者还有没有更好的解决方案
这是肯定的,实际上框架只能保证 sendRandom 有 worker 分配到,但是无法保证 worker 处理完成,这一层需要应用逻辑去保证。