我有一个使用aiohtp和aiobotocore来处理AWS中资源的项目。我正在尝试测试与AWS S3一起工作的类,我正在使用moto来模拟AWS。Mocking在使用同步代码的示例中工作得很好(来自moto docs的示例)
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
def save(self):
s3 = boto3.client('s3', region_name='us-east-1')
s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
model_instance = MyModel('steve', 'is awesome')
model_instance.save()
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
字符串
然而,在重写此以使用aiobotocore mocking之后,它就不起作用了--在我的示例中,它连接到了真实的AWS S3。
import aiobotocore
import asyncio
import boto3
from moto import mock_s3
class MyModel(object):
def __init__(self, name, value):
self.name = name
self.value = value
async def save(self, loop):
session = aiobotocore.get_session(loop=loop)
s3 = session.create_client('s3', region_name='us-east-1')
await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)
def test_my_model_save():
with mock_s3():
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
loop = asyncio.get_event_loop()
model_instance = MyModel('steve', 'is awesome')
loop.run_until_complete(model_instance.save(loop=loop))
body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")
assert body == 'is awesome'
型
所以我在这里的假设是moto不能与aiobotocore正常工作。如果我的源代码看起来像第二个例子中的那样,我如何有效地模拟AWS资源?
7条答案
按热度按时间k97glaaz1#
来自
moto
的模拟不起作用,因为它们使用同步API。但是,您可以启动moto
服务器并配置aiobotocore
连接到此测试服务器。Take a look on aiobotocore tests作为灵感。2g32fytz2#
使用AWS的编译器应该可以做到这一点。下面是我在一个tornado应用程序中为AWS读取操作所做的操作:
字符串
应该是类似的写操作。希望这将引导你在正确的方向。
更多信息:https://botocore.amazonaws.com/v1/documentation/api/latest/reference/stubber.html
pes8fvy93#
我认为塞巴斯蒂安Brestins的答案应该是公认的。我将发布这个新的答案,因为自从它发布以来,有些事情发生了变化,例如,python 3.8现在支持python测试用例,aioboto3客户端现在是上下文管理器。
使用Python 3.8的最小示例如下所示:
字符串
bvuwiixz4#
不幸的是,这不是一个完整的答案,但有一个pull request添加了这个功能,已经开放了6个月:https://github.com/aio-libs/aiobotocore/pull/766
当我处理类似的问题时,我已经为sync对象手工编写了“Pencc” Package 器。
f1tvaqid5#
下面是aiobotocore中没有pytest的mock_server.py:
字符串
n8ghc7c16#
我们可以使用moto[server]创建S3服务器,然后使用它创建一个类似于aioboto3的pytest fixture
字符串
然后
patch('aiobotocore.AioSession.create_client')
return_value和aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server)
如下所示型
yzckvree7#
下面是一个fixture,它使用
moto
包为您完成此任务:个字符
你可以在测试中包含这个fixture,它将使aws为这个测试“工作”。