所以我对python3的异步编程还很陌生。我正在使用types_aiobotocore_sqs和aiobotocore库创建aws sqs客户端。但是我发现了标题中提到的错误。
以下是代码实现
url.py file:
@api_view(["GET"])
def root_view(request):
async def run_async():
sqs_helper = await Helper.create(QueueName.TEST_QUEUE.value)
message = await sqs_helper.get_queue_url()
return message
response = asyncio.run(run_async())
return Response(str(response))
sqs_helper文件
class Helper:
def __init__(self, queue_name: str) -> None:
self.queue_name = queue_name
self.access_key_id = env("AWS_ACCESS_KEY_ID")
self.secret_access_key = env("AWS_SECRET_ACCESS_KEY")
self.region_name = env("REGION_NAME")
self.sqs = None
@classmethod
async def create(cls, queue_name):
instance = cls(queue_name)
await instance.setup()
return instance
async def setup(self):
self.sqs_client = await self.create_sqs_client()
self.sqs = Sqs(self.sqs_client, self.queue_name)
async def create_sqs_client(self):
session = get_session()
async with session.create_client('sqs', region_name=self.region_name, aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key) as client:
client: SQSClient
print("TYPE" + str(client))
return client
async def get_queue_url(self):
return await self.sqs.get_queue_url()
发起队列的SQS客户端
class Sqs:
def __init__(self, client: SQSClient, queue_name: str) -> None:
self.queue_name = queue_name
self._queue_url = ""
self.client = client
async def get_queue_url(self) -> str:
if not self._queue_url:
try:
response = await self.client.get_queue_url(QueueName=self.queue_name)
self._queue_url = response["QueueUrl"]
except ClientError as err:
if (
err.response.get("Error", {}).get("Code")
== "AWS.SimpleQueueService.NonExistentQueue"
):
raise QueueDoesNotExist(
f"Queue {self.queue_name} does not exist"
) from err
raise err
return self._queue_url
我得到了一个特别的错误,
response = await self.client.get_queue_url(name =self.queue_name)在第三个代码片段中的Sqs类中。
1条答案
按热度按时间btqmn9zl1#
当使用
async with session.create_client(...) as client
时,client
对象仅限于该上下文,并且在该上下文之外为None
。你应该直接分配它,以确保它在上下文之外是可访问的:确保手动处理客户端的关闭,因为您没有使用
async with
,它可以为您完成此操作。