我想通过气流DAG将JSON格式的GCS文件加载到BQ表中。因此,我使用了GoogleCloudStorageToBigQueryOperator
。此外,为了避免使用autodetect
选项,我创建了一个存储在GCS桶中的Schema JSON文件,我将JSON原始数据文件用作schema_object
。
下面是JSON架构文件:
[{"name": "id", "type": "INTEGER", "mode": "NULLABLE"},{"name": "description", "type": "INTEGER", "mode": "NULLABLE"}]
对于我的JSON原始数据文件,它看起来像这样(新行分隔的JSON文件):
{"description":"HR Department","id":9}
{"description":"Restaurant Department","id":10}
下面是我的操作符:
gcs_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id=table_name + "_gcs_to_bq",
bucket=bucket_name,
bigquery_conn_id="bigquery_default",
google_cloud_storage_conn_id="google_cloud_storage_default",
source_objects=[table_name + "/{{ ds_nodash }}/data_json/*.json"],
schema_object=table_name+"/{{ ds_nodash }}/data_json/schema_file.json",
allow_jagged_rows=True,
ignore_unknown_values=True,
source_format="NEWLINE_DELIMITED_JSON",
destination_project_dataset_table=project_id
+ "."
+ data_set
+ "."
+ table_name,
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_IF_NEEDED",
dag=dag,
)
我得到的错误是:
google.api_core.exceptions.BadRequest: 400 Error while reading data, error message: Failed to parse JSON: No object found when new array is started.; BeginArray returned false; Parser terminated before end of string File: schema_file.json
你能帮我解决这个问题吗?先谢了。
1条答案
按热度按时间x0fgdtte1#
我看到两个问题:
BigQuery
表架构不正确,description
列的类型为INTEGER
而不是STRING
。您必须将其设置为STRING
Airflow
版本。在最近的版本中,通常默认情况下源对象从bucket
参数中指定的存储桶中检索对象。对于旧版本,我不确定此行为。您可以设置完整路径以检查它是否解决了您的问题,例如:schema_object='gs://test-bucket/schema.json'