我尝试捕获spider上发生的异常,以便将任务示例标记为失败。当前任务已完成并标记为成功。我从Airflow中的PythonOperator
调用crawl()
,如下所示:
with DAG(
'MySpider',
default_args=default_args,
schedule_interval=None) as dag:
t1 = python_task = PythonOperator(
task_id="crawler_task",
python_callable=run_crawler,
op_kwargs=dag_kwargs
)
下面是我的run_crawler()
方法:
def run_crawler(**kwargs):
project_settings = set_project_settings({
'FEEDS': {
f'{kwargs["bucket"]}%(time)s.{kwargs["format"]}': {
'format': kwargs["format"],
'encoding': 'utf8',
'store_empty': kwargs["store_empty"]
}
}
})
print("Project settings: ")
pprint(project_settings.attributes.items())
set_connection("airflow", kwargs["gcs_connection_id"])
process = CrawlerProcess(project_settings)
process.crawl(spider.MySpider)
print("Starting crawler...")
process.start()
在运行时,我遇到GCS凭据问题,这导致了一个异常,如下所示:
google.auth.exceptions.DefaultCredentialsError: The file /tmp/file_my_credentials.json does not have a valid type. Type is None, expected one of ('authorized_user', 'service_account', 'external_account', 'external_account_authorized_user', 'impersonated_service_account', 'gdch_service_account').
{logging_mixin.py:115} WARNING - [scrapy.statscollectors] INFO: Dumping Scrapy stats:
{'downloader/request_bytes': 21087,
'downloader/request_count': 68,
'downloader/request_method_count/GET': 68,
'downloader/response_bytes': 1863876,
'downloader/response_count': 68,
'downloader/response_status_count/200': 68,
'elapsed_time_seconds': 25.647386,
'feedexport/failed_count/GCSFeedStorage': 1,
'httpcompression/response_bytes': 9212776,
'httpcompression/response_count': 68,
'item_scraped_count': 66,
'log_count/DEBUG': 136,
'log_count/ERROR': 1,
'log_count/INFO': 10,
'log_count/WARNING': 3,
'memusage/max': 264441856,
'memusage/startup': 264441856,
'request_depth_max': 1,
'response_received_count': 68,
'scheduler/dequeued': 68,
'scheduler/dequeued/memory': 68,
'scheduler/enqueued': 68,
'scheduler/enqueued/memory': 68,
[2032-13-13, 09:04:28 UTC] {engine.py:389} INFO - Spider closed (finished)
[2032-13-13, 09:04:28 UTC] {logging_mixin.py:115} WARNING -
[scrapy.core.engine] INFO: Spider closed (finished)
[2032-13-13, 09:04:28 UTC] {python.py:173} INFO - Done. Returned value was: None
[2032-13-13, 09:04:28 UTC] {taskinstance.py:1408} INFO - Marking task as SUCCESS. dag_id=MySpider, task_id=crawler_task, execution_date=2032-13-13, start_date=2032-13-13, end_date=2032-13-13
[2032-13-13, 09:04:28 UTC] {local_task_job.py:156} INFO - Task exited with return code 0
[2032-13-13, 09:04:28 UTC] {local_task_job.py:279} INFO - 0 downstream tasks scheduled from follow-on schedule check
正如你所看到的,即使有这个异常,任务本身被标记为“成功”。有没有可能捕捉它,以便标记为失败,然后我们可以遵循它的气流( composer )界面?
谢谢
1条答案
按热度按时间7fhtutme1#
我不明白为什么在这种情况下异常没有中断任务。
您可以在
run_crawler
方法中添加一个try
except
,然后在except
块中引发您自己的异常:在这种情况下,当您的自定义异常将被引发时,它将中断
Airflow
并将其标记为失败。