我正在尝试通过GoogleCloudStorageToBigQueryOperator任务将CSV文件从Google云存储加载到空的Google Big Query表中。
t8 = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_send_dim_report',
bucket='report',
source_objects=[
'gs://report/test-dim-report/dim_report_{{ ds_nodash }}.csv'
],
schema_fields=['filename_pdf','filename_png', 'week_date', 'code'],
skip_leading_rows=1,
source_format = 'CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
destination_project_dataset_table='xxxx-yyyy:report.test_dim_report_{{ ds_nodash }}',
dag=dag
)
要加载的表已经在Big Query中定义了schema,即使如此,为了解决这个错误,我添加了参数schema_fields
,其中包含我正在使用的CSV的列。查看任务日志,我首先遇到了以下依赖性错误:
from google.appengine.api import memcache
[2018-06-22 05:58:49,650] {base_task_runner.py:98} INFO - Subtask: ImportError: No module named 'google.appengine'
[2018-06-22 05:58:49,650] {base_task_runner.py:98} INFO - Subtask:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask: During handling of the above exception, another exception occurred:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask:
[2018-06-22 05:58:49,651] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python3.5/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 33, in <module>
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: from oauth2client.contrib.locked_file import LockedFile
[2018-06-22 05:58:49,652] {base_task_runner.py:98} INFO - Subtask: ImportError: No module named 'oauth2client.contrib.locked_file'
在日志的末尾,将显示最后一个错误:
Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'message': 'Empty schema specified for the load job. Please specify a schema that describes the data being loaded.'}.
我正在寻找一些解决该错误的方法,以便成功地将我的CSV文件加载到Google Big Query
2条答案
按热度按时间dy1byipe1#
有两种方法可以实现。这些都来自代码文档,还有这个初始位:
用于BigQuery表的架构可以通过两种方式之一指定。您可以直接传入架构字段,也可以将运算符指向Google云存储对象名称。Google云存储中的对象必须是包含架构字段的JSON文件。
GoogleCloudStorageToBigQueryOperator
文档中的说明正确定义schema_fields
。此处提供了如何定义方案的示例:https://cloud.google.com/bigquery/docs/schemas如果设置,则按此处定义的模式字段列表:当https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load源格式为“DATASTORE_BACKUP”时,不应设置www.example.com。
示例(来自示例链接):
schema_object
。如果设置,则为GCS对象路径,指向包含表架构的.json文件。(模板化)
elcex8rz2#
如@dboshardy所述,@tobi6提供的答案会导致以下错误:
错误-“SchemaField”类型的对象不可JSON序列化
如错误所示,SchemaField不是JSON可序列化类,而参数schema_fields需要JSON可序列化对象。
解决方案是将模式作为字典列表传递,如Airflow文档所示:https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
示例(基于OP问题):
针对类似的问题,通过Google Cloud Composer(airflow v1.10.6)成功测试了所提供的解决方案。