如何使用Pandas apply()函数对API应用异步调用

lstz6jyr  于 2023-04-19  发布在  其他
关注(0)|答案(3)|浏览(199)

我有一个~ 14,000行的dataframe,并试图通过调用API将一些数据填充到新列中。下面的代码检索预期的响应,但是,似乎每次迭代都等待响应转到下一行。
下面是函数:

def market_sector_des(isin):
isin = '/isin/' + isin
return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]

我正在使用xbbg调用Bloomberg API。
.apply()函数返回预期的响应,

df['new_column'] = df['ISIN'].apply(market_sector_des)

但是每个响应需要大约2秒,在14,000行中大约需要8小时。
有没有什么方法可以让这个apply函数异步,这样所有的请求都可以并行发送?我已经看到dask作为一个替代方案,但是,我在使用它时也遇到了问题。

bvjxkvbb

bvjxkvbb1#

如果以上正是您想要做的,那么可以通过创建一个包含要发送的ticker语法的列,然后将该列作为一个序列通过blpapi传递来实现

df['ISIN_NEW'] = '/isin/' + df['ISIN']
isin_new = pd.unique(df['ISIN_NEW'].dropna())
mktsec_df = blp.bdp(tickers = isin_new, flds = ['market_sector_des'])

然后,您可以将新创建的df连接到现有的df,以便完整地获得列中的数字。

newdf = pd.merge(df, mktsec_df, how='left', left_on = 'ISIN_NEW', right_index = True )

这应该会导致一个单一的电话,这将理想地下降到不到一分钟的速度。如果这工作让我知道。

5q4ezhmt

5q4ezhmt2#

你可以使用multiprocessing来并行化API调用。将你的Series划分为THREAD块,然后每个块运行一个进程:
main.py

import multiprocessing as mp
import pandas as pd
import numpy as np
import parallel_tickers

THREADS = mp.cpu_count() - 1

# df = your_dataframe_here
split = np.array_split(df['ISIN'], THREADS)
with mp.Pool(THREADS) as pool:
    data = pool.map(proxy_func, split)

df['new_column'] = pd.concat(data)

parallel_tickers.py

import pandas as pd
from xbbg import blp

def market_sector_des(isin):
    isin = '/isin/' + isin
    return blp.bdp(tickers = isin, flds = ['market_sector_des']).iloc[0]

def proxy_func(sr):
    return pd.Series([market_sector_des(isin) for isin in sr], index=sr.index)

编辑:使用另一个模块的mp功能

vdzxcuhz

vdzxcuhz3#

这可以使用asyncio语法来完成。工作代码如下:

import asyncio
async def market_sector_des_async(isin):
    isin = '/isin/' + isin
    return blp.bdp(tickers = isin, flds=['market_sector_des']).iloc[0]['market_sector_des']

async def main():
    tasks = [asyncio.create_task(market_sector_des_async(isin)) for isin in df['ISIN']]
    results = await asyncio.gather(*tasks)
    df['market_sector_des'] = results

await main()

相关问题