pandas 如何在使用asyncio和aiohttp通过API接收数据时将DataFrame写入csv

pqwbnv8z  于 2023-02-20  发布在  其他
关注(0)|答案(2)|浏览(215)

我用Python和aiohttp构建了一个API Package 器模块,它允许我显著地加快发出多个GET请求和检索数据的过程,每个数据响应都被转换成一个PandasDataFrame。
使用asyncio,我执行如下操作:

import asyncio
from custom_module import CustomAioClient

id_list = ["123", "456"]

async def main():
    client = CustomAioClient()

    tasks = []
    for id in id_list:
        task = asyncio.ensure_future(client.get_latest_value(id=id))
        tasks.append(task)

    responses = await asyncio.gather(*tasks, return_exceptions=True)
    # Close the session
    await client.close_session()
    return responses

if __name__ == "__main__":
    asyncio.run(main())

这将返回一个Pandas Dataframe 列表,其中包含id_list中每个id的时间序列,我想将其保存为csv文件。
显然,我可以迭代列表并迭代地保存每个DataFrame,但这对我来说似乎效率很低。有什么方法可以改进吗?

    • 编辑**

我做了下面的操作来保存东西,这比仅仅迭代多个URL,获取数据并保存它要快得多。我怀疑这是否充分利用了异步功能。

import asyncio
from custom_module import CustomAioClient

async def fetch(client: CustomAioClient, id: str):
    df = await client.get_latest_value(id=id)
    df.to_csv(f"C:/{id}.csv")
    print(df)

async def main():
    client = CustomAioClient()
    id_list = ["123", "456"]

    tasks = []
    for id in id_list:
        task = asyncio.ensure_future(fetch(client=client, id=id))
        tasks.append(task)

    responses = await asyncio.gather(*tasks, return_exceptions=True)
    # Close the session
    await client.close_session()

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
icnyk63a

icnyk63a1#

查看asyncio article in Real Python末尾的示例
此示例采用的方法是设置函数调用以执行单个操作,包括获取数据并将其写入文件,然后创建bulk_method以处理多个请求。
此外,“with”关键字应用于需要特定设置和清理的操作,如打开文件或连接到服务器。

dfuffjeb

dfuffjeb2#

你可以声明一个简单的函数来下载DataFrame并将其保存到csv文件中,然后,你可以使用ThreadPoolExecutor和ayncio事件循环来调用这个函数,如下所示:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from custom_module import CustomAioClient

def download_to_csv(client: CustomAioClient, id: str) -> None:
    df = client.get_latest_value(id=id)
    df.to_csv(f"{id}.csv")

async def process(id_list: list[str]) -> None:
    client = CustomAioClient()
    with ThreadPoolExecutor() as executor:
        loop = asyncio.get_event_loop()
        tasks = [loop.run_in_executor(executor, download_to_csv(client, id)) for id in id_list]
    await asyncio.gather(*tasks)

id_list = ["123", "456"]

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(process(id_list))
    loop.run_until_complete(future)

相关问题