从Airflow爬网时捕获Scrapy异常

x7rlezfr  于 2022-12-26  发布在  其他
关注(0)|答案(1)|浏览(142)

我尝试捕获spider上发生的异常,以便将任务示例标记为失败。当前任务已完成并标记为成功。我从Airflow中的PythonOperator调用crawl(),如下所示:

with DAG(
    'MySpider',
    default_args=default_args,
    schedule_interval=None) as dag:

    t1 = python_task = PythonOperator(
        task_id="crawler_task",
        python_callable=run_crawler,
        op_kwargs=dag_kwargs
    )

下面是我的run_crawler()方法:

def run_crawler(**kwargs):
    project_settings = set_project_settings({
        'FEEDS': {
            f'{kwargs["bucket"]}%(time)s.{kwargs["format"]}': {
                'format': kwargs["format"],
                'encoding': 'utf8',
                'store_empty': kwargs["store_empty"]
            }
        }
    })
    
    print("Project settings: ")
    pprint(project_settings.attributes.items())

    set_connection("airflow", kwargs["gcs_connection_id"])
    
    process = CrawlerProcess(project_settings)
    process.crawl(spider.MySpider)

    print("Starting crawler...")
    process.start()

在运行时,我遇到GCS凭据问题,这导致了一个异常,如下所示:

google.auth.exceptions.DefaultCredentialsError: The file /tmp/file_my_credentials.json does not have a valid type. Type is None, expected one of ('authorized_user', 'service_account', 'external_account', 'external_account_authorized_user', 'impersonated_service_account', 'gdch_service_account').

{logging_mixin.py:115} WARNING - [scrapy.statscollectors] INFO: Dumping Scrapy stats:
{'downloader/request_bytes': 21087,
 'downloader/request_count': 68,
 'downloader/request_method_count/GET': 68,
 'downloader/response_bytes': 1863876,
 'downloader/response_count': 68,
 'downloader/response_status_count/200': 68,
 'elapsed_time_seconds': 25.647386,
 'feedexport/failed_count/GCSFeedStorage': 1,
 'httpcompression/response_bytes': 9212776,
 'httpcompression/response_count': 68,
 'item_scraped_count': 66,
 'log_count/DEBUG': 136,
 'log_count/ERROR': 1,
 'log_count/INFO': 10,
 'log_count/WARNING': 3,
 'memusage/max': 264441856,
 'memusage/startup': 264441856,
 'request_depth_max': 1,
 'response_received_count': 68,
 'scheduler/dequeued': 68,
 'scheduler/dequeued/memory': 68,
 'scheduler/enqueued': 68,
 'scheduler/enqueued/memory': 68,
[2032-13-13, 09:04:28 UTC] {engine.py:389} INFO - Spider closed (finished)
[2032-13-13, 09:04:28 UTC] {logging_mixin.py:115} WARNING - 
[scrapy.core.engine] INFO: Spider closed (finished)
[2032-13-13, 09:04:28 UTC] {python.py:173} INFO - Done. Returned value was: None
[2032-13-13, 09:04:28 UTC] {taskinstance.py:1408} INFO - Marking task as SUCCESS. dag_id=MySpider, task_id=crawler_task, execution_date=2032-13-13, start_date=2032-13-13, end_date=2032-13-13
[2032-13-13, 09:04:28 UTC] {local_task_job.py:156} INFO - Task exited with return code 0
[2032-13-13, 09:04:28 UTC] {local_task_job.py:279} INFO - 0 downstream tasks scheduled from follow-on schedule check

正如你所看到的,即使有这个异常,任务本身被标记为“成功”。有没有可能捕捉它,以便标记为失败,然后我们可以遵循它的气流( composer )界面?
谢谢

7fhtutme

7fhtutme1#

我不明白为什么在这种情况下异常没有中断任务。
您可以在run_crawler方法中添加一个tryexcept,然后在except块中引发您自己的异常:

import logging

def run_crawler(**kwargs):
    class CustomException(Exception):
        pass

    try:
        project_settings = set_project_settings({
            'FEEDS': {
                f'{kwargs["bucket"]}%(time)s.{kwargs["format"]}': {
                    'format': kwargs["format"],
                    'encoding': 'utf8',
                    'store_empty': kwargs["store_empty"]
                }
            }
        })

        print("Project settings: ")
        pprint(project_settings.attributes.items())

        set_connection("airflow", kwargs["gcs_connection_id"])

        process = CrawlerProcess(project_settings)
        process.crawl(spider.MySpider)

        print("Starting crawler...")
        process.start()
    except Exception as err:
        logging.error("Error in the Airflow task !!!!!", err)
        raise CustomException("Error in the Airflow task !!!!!", err)

在这种情况下,当您的自定义异常将被引发时,它将中断Airflow并将其标记为失败。

相关问题