我使用Azure服务总线队列收集需要发布到API的消息。我正在队列上使用Azure触发器函数来处理传入消息。不幸的是,API是不可靠的,所以我正在添加我自己的重试计划。
例如,第一次无法处理消息时,我想在15分钟内安排另一次重试,然后是1小时、2小时,依此类推。到达maxDeliveryCount
后,应将消息发送到DLQ。
消息的调度正在工作,但是,每个调度的消息都被视为新消息,这意味着deliveryCount
始终为1。我使用@azure/service-bus
NPM包在Azure函数中执行重新调度。
因此,我的问题是:如何在Azure触发器函数中重新计划SB队列消息,同时增加这些消息的deliveryCount
?
(我知道我可以将自己的deliveryCount属性添加到消息中,但我不想用元数据污染消息体。)
为了提供更多细节,这是一个MRE:
const serviceBusQueueTrigger: AzureFunction = async function (
context: Context,
msg: any
): Promise<void> {
context.log("ServiceBus queue trigger function processed message", msg);
const serviceBusClient = new ServiceBusClient(<connection_string>);
const sender = serviceBusClient.createSender(<queue_name>);
context.log("dequeueCount: ", context.bindingData.deliveryCount);
try {
// POST to API
} catch (error) {
// if an error is thrown here, trigger resubmits immediately
// I want to wait proportionally to current deliveryCount
await sender.scheduleMessages(
{
body: msgData,
contentType: "application/json",
},
// just an example of schedule time
new Date(Date.now() + Math.pow(2, context.bindingData.deliveryCount) * 60 * 1000)
);
}
await sender.close();
}
===
编辑:虽然Microsoft Docs仍然包含有关内置重试机制的(模糊的)细节(@Manish在他的回答中链接),但该功能似乎不再支持。相关的讨论可以在here中找到。
因此,我最初的问题是,如何使用Azure服务总线触发器函数构建此机制?谢谢!
host.json
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.0.0, 5.0.0)"
},
"concurrency": {
"dynamicConcurrencyEnabled": true,
"snapshotPersistenceEnabled": true
},
"extensions": {
"serviceBus": {
"clientRetryOptions": {
"mode": "exponential",
"tryTimeout": "00:01:00",
"delay": "00:01:00",
"maxDelay": "03:00:00",
"maxRetries": 3
}
}
}
}
function.json
{
"bindings": [
{
"name": "incoming",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "incoming",
"connection": "SERVICEBUS"
}
],
"retry": {
"strategy": "exponentialBackoff",
"maxRetryCount": 5,
"minimumInterval": "00:00:10",
"maximumInterval": "00:15:00"
},
"scriptFile": "../dist/trigger/index.js"
}
2条答案
按热度按时间c3frrgcw1#
问题是,每当你在队列中发送一条消息时,它就像对待一条新消息一样。
怎么修?
你不必在队列中推送新消息,只需删除try-catch,所有发送代码和函数应用程序将负责休息。
请确保您的function.json文件看起来像下面(代码复制自下面的链接)
更多信息-> https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=python-v2%2Cin-process%2Cextensionv5&pivots=programming-language-javascript
对于指数重试,请确保您有host.json设置如下
x1c 0d1x更多信息-> https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus?tabs=in-process%2Cextensionv5%2Cextensionv3&pivots=programming-language-javascript
编辑1:在阅读github issue之后,我们可以尝试使用重试
编辑2:确保您已经安装了bundle
juud5qan2#
@Manish我可以确认function.json上的重试选项有效。
@picklepick,为了解决你的需求,你可以按照Manish的建议在function.json上使用重试选项,并将队列上的最大交付计数设置为1。这将允许使用function.json在函数级别完成所有重试,然后在完成并失败后将消息移动到DLQ。
您也可以使用2(功能级别和服务总线级别)的组合,即让函数retries完成,将消息移回队列,并让它再次执行函数retries。为此,您需要将最大交付计数设置为2。
因此,使用这些选项,您可以使用配置执行重试,而无需在函数本身中引入额外的重试代码。