python-3.x 未使用asyncio/aiohttp完成响应负载

iq0todco  于 2023-02-06  发布在  Python
关注(0)|答案(2)|浏览(205)

我编写了一个Python 3.7脚本,它使用多个对象异步创建Salesforce批量API (v45.0)作业/批处理,每个对象由单个SOQL语句查询,等待批处理完成,完成后下载(流式传输)结果到服务器,进行一些数据转换,然后最后将结果同步上传到SQL Server 2016 SP1 (13.0.4560.0)。我已经用这个成功地试运行了很多次,并认为它工作得很完美,然而,我最近开始间歇性地收到以下错误,我有点不知所措如何修复,因为有很少的报告/解决方案,这在网络上:
客户端异常。客户端有效负载错误:响应负载未完成
示例代码片段:

import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree

#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
                password=password,
                security_token=securityToken,
                organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'

#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary = 
{'Account': {'job':
                {'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
             'id': '8752R00000iUjtReqS'},
             'soql': 'select Id,Name from Account'},

 'Contact': {'job':
                {'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'},
             'id': '8800o00000POIkLlLa'},
             'soql': 'select Id,Name from Contact'}}

async def retrieveResults(jobId, batchId, sfObject):
    headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
    async with aiohttp.ClientSession() as session:
        async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data) #list of batch results
            for resultID in batchResults:
                async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r:
                    async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)

async def asyncDownload():
    await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])

if __name__ == "__main__":
    asyncio.run(asyncDownload())

回溯(错误行与上面的代码段不匹配):
追溯(最近调用最后调用):
www.example.com中的文件"C:\Code\salesforce.py",第252行(异步下载())asyncio.run(asyncDownload())
文件"C:\程序文件\Python37\lib\asyncio\runners.py",第43行,在运行返回www.example.com_until_complete(main)中loop.run_until_complete(main)
文件"C:\程序文件\Python37\lib\asyncio\base_events.py",第584行,在run_until_complete中返回将来的结果()
文件"C:\Code\salesforce.py",第241行,在异步下载中等待异步收集(*[检索结果(对象字典[sfObject] ['job '] ['id'],对象字典[sfObject] ['job '] ['batch'] ['id '],sfObject)用于对象字典中的sfObject])
文件"C:\Code\salesforce.py",第183行,检索结果块=等待www.example.com(81920)r.content.read(81920)
文件"C:\程序文件\Python37\库\站点包\aiohttp\streams.py",第369行,在读取等待自身。_等待("读取")
文件"C:\程序文件\Python37\lib\站点包\aiohttp\streams.py",第297行,在等待等待服务器中
客户端异常。客户端有效负载错误:响应负载未完成
问题的根源似乎始于r.content.read(81920),它应该是81920字节块的流数据,但这是我所能得到的最远的数据。
我不认为这是我这边的网络问题,因为在此服务器上有其他连接到外部源的小作业,在此作业运行时可以顺利完成。有人知道这是怎么回事吗?
谢谢大家!

  • -编辑:*

我尝试使用iter_any()而不是read(),但仍然得到相同的错误...

async for data in r.content.iter_any():
    await outfile.write(data)

我尝试了readline(),但仍然得到相同的错误...

async for line in r.content.readline():
    await outfile.write(line)

我已经在错误处理代码中加入了一些重试功能(没有包含在原来的问题中),这最终允许作业完成。负载错误仍然发生,这仍然是主要问题,但重试下载已经是一个成功的解决方案。如果有人能够提供进一步的信息,问题仍然存在。

zxlwwiss

zxlwwiss1#

您好,您是否尝试过将await asyncio.sleep(0)插入:

...
                    while True:
                        chunk = await r.content.read(81920)
                        await asyncio.sleep(0)
                        if not chunk:
                            break
                        await outfile.write(chunk)
                    ...
pw9qyyiw

pw9qyyiw2#

我在Amazon Lambda中遇到了这个错误(这是在请求时抛出的)
等待asyncio.gather(* 任务)#任务,如asyncio. sure_future()
解决方案,修复生成环境:

FROM amazonlinux:2 AS

FROM lambci/lambda:build-python3.8

我想问题是.so文件或者其他底层的东西,库内部用来管理协程的,与lambda环境不兼容,因此,构建正确的docker库可以解决这个问题。

相关问题