在Azure Function应用程序中限制Azure存储队列处理

sq1bmfud  于 2023-06-24  发布在  其他
关注(0)|答案(4)|浏览(122)

我已经创建了一个Azure Function应用程序,其中包含一个Azure存储队列触发器,该触发器处理一个队列,其中每个队列项都是一个URL。该函数只是下载URL的内容。我还有另一个函数,它加载和解析站点的XMLSitemap,并将所有页面URL添加到队列中。我遇到的问题是函数应用程序运行得太快,它锤击网站,所以它开始返回服务器错误。是否有方法限制/调节Functions应用程序的运行速度?
当然,我可以编写一个简单的Web作业来串行处理它们(或者使用一些异步但限制并发请求的数量),但我真的很喜欢Azure函数的简单性,并希望尝试“无服务器”计算。

lo8azlld

lo8azlld1#

有几个选项你可以考虑。
首先,您可以在host.json中配置一些控制队列处理的旋钮(here文档)。queues.batchSize旋钮表示一次获取多少队列消息。如果设置为1,则运行库将一次获取1条消息,并且仅在该消息的处理完成时获取下一条消息。这可以为您在 * 单个示例 * 上提供某种级别的序列化。
另一种选择是,您可以在入队的消息上设置NextVisibleTime,使它们间隔开-默认情况下,入队的消息变为可见,并准备立即处理。
最后一个选项可能是将消息与站点的所有URL的集合一起入队,而不是一次一个,因此当处理消息时,您可以在函数中串行处理URL,并以这种方式限制并行性。

gtlvzcf8

gtlvzcf82#

如果有多个并行函数添加到队列中,NextVisibleTime可能会变得混乱。另一个简单的选择,任何人都有这个问题:创建另一个队列“throttled-items”,并让您的原始函数跟随队列触发器。然后,添加一个简单的计时器函数,它每分钟从原始队列中移动消息,相应地间隔NextVisibleTime

[FunctionName("ThrottleQueueItems")]
    public static async Task Run([TimerTrigger("0 * * * * *")] TimerInfo timer, ILogger logger)
    {
        var originalQueue = // get original queue here;
        var throttledQueue = // get throttled queue here;
        var itemsPerMinute = 60; // get from app settings
        var individualDelay = 60.0 / itemsPerMinute;
        var totalRetrieved = 0;
        var maxItemsInBatch = 32; // change if you modify the default queue config
        do
        {
            var pending = (await originalQueue.GetMessagesAsync(Math.Min(maxItemsInBatch, itemsPerMinute - totalRetrieved))).ToArray();
            if (!pending.Any())
                break;
            foreach (var message in pending)
            {
                await throttledQueue.AddMessageAsync(new CloudQueueMessage(message.AsString), null,
                                                                                        TimeSpan.FromSeconds(individualDelay * ++totalRetrieved), null, null);
                await originalQueue.DeleteMessageAsync(message);
            }
        } while (itemsPerMinute > totalRetrieved);
    }
guicsvcw

guicsvcw3#

我在试图解决类似问题时发现了这篇文章。这可能对任何到达这里的人有用。现在,您可以使用WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT应用设置限制函数的并发示例数。将此值设置为1,并将批处理限制设置为1,将允许您执行队列的串行处理。

WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT

  • 函数应用可以扩展到的最大示例数。默认值为无限制。*

https://learn.microsoft.com/en-gb/azure/azure-functions/functions-app-settings#website_max_dynamic_application_scale_out

omqzjyyz

omqzjyyz4#

使用ThrottlingTroll,这可以如下实现。
1.这样配置HttpClient:

static HttpClient myThrottledHttpClient = new HttpClient
(
    new ThrottlingTrollHandler
    (
        // Consider using RedisCounterStore instead, so that rate counters are stored in a distributed cache 
        counterStore: new MemoryCacheCounterStore(),

        config: new ThrottlingTrollEgressConfig
        {
            Rules = new[]
            {
                new ThrottlingTrollRule
                {
                    // No more than 10 requests per second
                    LimitMethod = new FixedWindowRateLimitMethod
                    {
                        PermitLimit = 10,
                        IntervalInSeconds = 1,
                    }
                },
            }
        }
    )
);

1.然后这样使用:

[Function("QueueTrigger")]
[QueueOutput(QueueName)] 
public async Task<string> Run([QueueTrigger(QueueName)] string msg)
{
    // Making the call via an HttpClient scaffolded with ThrottlingTroll
    var response = await myThrottledHttpClient.GetAsync("https://my-http-endpoint");

    // If request rate limit is exceeded, myThrottledHttpClient will return this status by itself, _without_ making the actual call.
    if (response.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
    {
        // Just to reduce the load to the queue
        await Task.Delay(1000);

        // Re-queueing the same message
        return msg;
    }

    // Do whatever else needed with message and response

    return null;
}

配备ThrottlingTroll的HttpClient将限制自身。如果超过了限制,它将返回429 TooManyRequests *,而不进行实际的调用 *。当这种情况发生时,我们只需要将消息放回同一个队列。
假设你的Function有多个示例,考虑使用RedisCounterStore来维护所有示例的速率限制。

相关问题