我有一个FIFO队列,里面有超过200万条消息。我想用lambda函数处理它们,但是10次轮询消息中有9次我得到的回复是队列为空。这绝对不是真的。我尝试更改为长轮询,但是没有帮助。下面是我轮询消息的代码。
import { create as listenToSqsQueue } from "sqs-consumer";
listenToSqsQueue({
queueUrl: Config.aws.sqsurl,
handleMessage: async function (message, done){
// do some work with `message`s
Promise.resolve(invokePoller(functionName, message, callback));
done();
}
,
batchSize: 10
}).on("empty", function() {
callback(undefined, "Queue is empty");
}).on("error", function(error: Error, message: any) {
console.error(error, message);
callback(error)
}).start();
3条答案
按热度按时间xeufq47z1#
我建议您尝试下面的步骤来了解FIFO队列的行为。
1.创建FIFO队列以进行测试
1.向队列发送两条消息
1.调用队列的receive-message API,应该会收到输入到队列中的第一条消息
1.多次调用接收消息API,对于后续尝试,您将得到空响应。30秒后(可见性超时),如果调用接收消息API,您将得到相同的消息。但是,您将永远不会从队列中接收到第二条消息。
1.删除在步骤3中收到的消息
1.调用API,您应该会收到发送到队列的第二条消息
队列使用者收到接收消息API的空响应的原因是队列使用者在处理接收到的消息后尚未删除该消息。如果FIFO队列传送了其他消息,但尚未删除已传送的消息,这可能违反“先进先出递送”的保证,因为第二消息可能在第一消息被处理之前被处理。如果考虑有多个队列使用者的情况,这种行为会变得更加明显。
总而言之,我建议您考虑删除一条消息作为对FIFO队列的通知,以指示队列消费者已成功处理该消息,并允许FIFO队列传递队列中的下一条消息。要修复客户端代码,您可以修改它,以便在成功处理消息后删除该消息。
pokxtpni2#
作为@Denis Weerasiri提出的解决方案的替代方案,您可以为每条消息设置不同的消息组ID,这样就可以绕过应用于同一组中的消息的FIFO限制。
参见AWS文档中的Using the Amazon SQS message group ID。
6kkfgxo03#
SQS有这样的警告,对于FIFO队列,当您接收到属于特定消息组ID的消息时,请注意以下事项:
1.您必须删除或移动当前接收呼叫中的留言,然后才能接收来自同一组ID的更多留言。注意:必须将消息从飞行中可用状态移动。
1.您无法接收其他消息组中的消息。
更多详情请参阅官方answer