我有一个awaitables
的list
,我想传递给asyncio.AbstractEventLoop
,但我需要限制对第三方API的请求。
我希望避免等待将future
传递给循环的情况,因为在此期间我阻塞了循环等待。我有什么选择?Semaphores
和ThreadPools
将限制并发运行的数量,但这不是我的问题。我需要将请求限制为100/秒,但完成请求需要多长时间并不重要。
这是一个使用标准库的非常简洁的(非)工作示例,演示了问题所在。它应该以100/sec的速度进行节流,但实际上节流到了116.651/sec。在asyncio中,节流异步请求调度的最佳方式是什么?
工作代码:
import asyncio
from threading import Lock
class PTBNL:
def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.token_bucket = TokenBucket()
self.token_bucket.set_rate(100)
def run(self, *awaitables):
loop = asyncio.get_event_loop()
if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)
def sleep(self, secs) -> True:
self.run(asyncio.sleep(secs))
return True
def get_req_id(self) -> int:
new_id = self._req_id_seq
self._req_id_seq += 1
return new_id
def start_req(self, key):
loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future
def end_req(self, key, result=None):
future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)
def req_data(self, req_id, obj):
# Do Some Work Here
self.req_data_end(req_id)
pass
def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)
async def req_data_async(self, obj):
req_id = self.get_req_id()
future = self.start_req(req_id)
self.req_data(req_id, obj)
await future
return future.result()
async def req_data_batch_async(self, contracts):
futures = []
FLAG = False
for contract in contracts:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)
nap = self.token_bucket.consume(1)
if FLAG is False:
FLAG = True
start = asyncio.get_event_loop().time()
asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract)
await asyncio.gather(*futures)
elapsed = asyncio.get_event_loop().time() - start
return futures, len(contracts)/elapsed
class TokenBucket:
def __init__(self):
self.tokens = 0
self.rate = 0
self.last = asyncio.get_event_loop().time()
self.lock = Lock()
def set_rate(self, rate):
with self.lock:
self.rate = rate
self.tokens = self.rate
def consume(self, tokens):
with self.lock:
if not self.rate:
return 0
now = asyncio.get_event_loop().time()
lapse = now - self.last
self.last = now
self.tokens += lapse * self.rate
if self.tokens > self.rate:
self.tokens = self.rate
self.tokens -= tokens
if self.tokens >= 0:
return 0
else:
return -self.tokens / self.rate
if __name__ == '__main__':
asyncio.get_event_loop().set_debug(True)
app = PTBNL()
objs = [obj for obj in range(500)]
l,t = app.run(app.req_data_batch_async(objs))
print(l)
print(t)
编辑:我在这里添加了一个使用信号量的TrottleTestApp
的简单示例,但仍然不能限制执行:
import asyncio
import time
class ThrottleTestApp:
def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.sem = asyncio.Semaphore()
async def allow_requests(self, sem):
"""Permit 100 requests per second; call
loop.create_task(allow_requests())
at the beginning of the program to start this routine. That call returns
a task handle that can be canceled to end this routine.
asyncio.Semaphore doesn't give us a great way to get at the value other
than accessing sem._value. We do that here, but creating a wrapper that
adds a current_value method would make this cleaner"""
while True:
while sem._value < 100: sem.release()
await asyncio.sleep(1) # Or spread more evenly
# with a shorter sleep and
# increasing the value less
async def do_request(self, req_id, obj):
await self.sem.acquire()
# this is the work for the request
self.req_data(req_id, obj)
def run(self, *awaitables):
loop = asyncio.get_event_loop()
if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)
def sleep(self, secs: [float]=0.02) -> True:
self.run(asyncio.sleep(secs))
return True
def get_req_id(self) -> int:
new_id = self._req_id_seq
self._req_id_seq += 1
return new_id
def start_req(self, key):
loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future
def end_req(self, key, result=None):
future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)
def req_data(self, req_id, obj):
# This is the method that "does" something
self.req_data_end(req_id)
pass
def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)
async def req_data_batch_async(self, objs):
futures = []
FLAG = False
for obj in objs:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)
if FLAG is False:
FLAG = True
start = time.time()
self.do_request(req_id, obj)
await asyncio.gather(*futures)
elapsed = time.time() - start
print("Roughly %s per second" % (len(objs)/elapsed))
return futures
if __name__ == '__main__':
asyncio.get_event_loop().set_debug(True)
app = ThrottleTestApp()
objs = [obj for obj in range(10000)]
app.run(app.req_data_batch_async(objs))
3条答案
按热度按时间pdsfdshx1#
您可以通过实现leaky bucket algorithm:
注意,我们会随机地从存储桶中泄漏容量,因此不需要运行单独的异步任务来降低容量;相反,当测试足够剩余容量时,容量被泄漏。
请注意,等待容量的任务保存在有序字典中,当可能再次出现空闲容量时,第一个仍在等待的任务会被提前唤醒。
您可以将其用作上下文管理器;当桶是满块时尝试获取桶,直到再次释放足够的容量:
也可以直接调用
acquire()
:或者你可以简单的测试一下是否有空格在前面
请注意,您可以通过增加或减少您“滴入”桶中的量来将某些请求计数为“较重”或“较轻”:
不过要小心这个;当混合大液滴和小液滴时,在最大速率或接近最大速率时,小液滴倾向于在大液滴之前流动,这是因为在存在用于较大液滴的空间之前存在用于较小液滴的足够的空闲容量的可能性较大。
演示:
桶在开始时被快速地突发填充,使得剩余的任务被更均匀地分布;每2秒就释放足够的容量用于处理另一任务。
最大猝发大小等于最大速率值,在上述演示中设置为5。如果不想允许猝发,请将最大速率设置为1,并将时间段设置为滴注之间的最短时间:
我抽时间把它打包成一个Python项目:https://github.com/mjpieters/aiolimiter
fafcakar2#
另一个解决方案--使用有界信号量--由同事、导师和朋友提供,如下所示:
仍然可以与@Martijn答案中的相同
async with bucket
代码一起使用avkwfej43#
一个简单的解决方案,用于管理每秒最大请求数和最大同时连接到API,我将其与盈透证券API一起使用。
输出示例: