scrapy是如何将第三方库(如管道中的aiomysql)的协程方法与合并结合起来存储数据的?

dwthyt8l  于 12个月前  发布在  Mysql
关注(0)|答案(1)|浏览(158)

我在使用scrapy的协程功能的时候,有一个场景是需要用aiomysql来存储item的数据,但是偶尔Task was destroyed but it is pending也会报错,也就是说有时候它可以很快的并且正常的运行,但是大部分都会报错,我对协程不太了解,所以不知道是不是aiomysql库的问题,我写的scrapy代码有问题,或者别的什么。
下面是示例代码,这只是一个粗略的例子:

# TWISTED_REACTOR has been enabled
import aiomysql
from twisted.internet.defer import Deferred

def as_deferred(f):
    """
    transform a Twisted Deferred to an Asyncio Future
    Args:
        f: async function

    Returns:
        1).Deferred
    """
    return Deferred.fromFuture(asyncio.ensure_future(f))

class AsyncMysqlPipeline:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    def open_spider(self, spider):
        return as_deferred(self._open_spider(spider))

    async def _open_spider(self, spider):
        self.pool = await aiomysql.create_pool(
            host="localhost",
            port=3306,
            user="root",
            password="pwd",
            db="db",
            loop=self.loop,
        )
    
    async def process_item(self, item, spider):
        async with self.pool.acquire() as aiomysql_conn:
            async with aiomysql_conn.cursor() as aiomysql_cursor:
                # Please ignore this "execute" line of code, it's just an example
                await aiomysql_cursor.execute(sql, tuple(new_item.values()) * 2)
                await aiomysql_conn.commit()
        return item

    async def _close_spider(self):
        await self.pool.wait_closed()

    def close_spider(self, spider):
        self.pool.close()
        return as_deferred(self._close_spider())

字符串
据我搜索的其他类似问题了解,asyncio.create_task存在被垃圾回收机制自动回收,然后随机导致task was destroyed but it is pending异常的问题,以下是相应的参考链接:
1.浏览器:Use strong references for free-flying tasks · Issue #91887

  1. Corotine的exceptfinally块中的Context不正确·问题编号93740
    1.修复:prevent undone task be killed by gc by ProgramRipper · Pull Request #48
    不知道是不是因为这个原因,我解决不了我的问题,不知道有没有人遇到过类似的错误,也希望有人能给予一个例子,在pipelines中使用协程存储数据,不限制任何库或方法的使用。
    附加我的操作环境:
  • Scrape version:2.8.0
  • aiomysql版本:0.1.1
  • 操作系统:Win10 and Centos 7.5
  • python版本:3.8.5

我的英语很差,希望我能把我的问题描述清楚。
我尝试使用aiomysql协程中的scrapy管道来存储数据,期望能正常运行,但实际情况会偶尔出现“task was destroyed but it is pending”的错误。

ngynwnxp

ngynwnxp1#

原因如我列出的引用链接所示,可以通过保证协程任务的强引用来解决。
解决方案类似于以下内容:

running_tasks = set()
# [...]
task = asyncio.create_task(some_function())
running_tasks.add(task)
task.add_done_callback(lambda t: running_tasks.remove(t))

字符串

相关问题