json 在Airflow中使用GoogleCloudStorageToBigQuery运算符中的Schema_Object

2uluyalo  于 2023-02-10  发布在  Go
关注(0)|答案(1)|浏览(117)

我想通过气流DAG将JSON格式的GCS文件加载到BQ表中。因此,我使用了GoogleCloudStorageToBigQueryOperator。此外,为了避免使用autodetect选项,我创建了一个存储在GCS桶中的Schema JSON文件,我将JSON原始数据文件用作schema_object
下面是JSON架构文件:

[{"name": "id", "type": "INTEGER", "mode": "NULLABLE"},{"name": "description", "type": "INTEGER", "mode": "NULLABLE"}]

对于我的JSON原始数据文件,它看起来像这样(新行分隔的JSON文件):

{"description":"HR Department","id":9}
{"description":"Restaurant Department","id":10}

下面是我的操作符:

gcs_to_bq = GoogleCloudStorageToBigQueryOperator(
        task_id=table_name + "_gcs_to_bq",
        bucket=bucket_name,
        bigquery_conn_id="bigquery_default",
        google_cloud_storage_conn_id="google_cloud_storage_default",
        source_objects=[table_name + "/{{ ds_nodash }}/data_json/*.json"],
        schema_object=table_name+"/{{ ds_nodash }}/data_json/schema_file.json",
        allow_jagged_rows=True,
        ignore_unknown_values=True,
        source_format="NEWLINE_DELIMITED_JSON",
        destination_project_dataset_table=project_id
        + "."
        + data_set
        + "."
        + table_name,
        write_disposition="WRITE_TRUNCATE",
        create_disposition="CREATE_IF_NEEDED",
        dag=dag,
    )

我得到的错误是:

google.api_core.exceptions.BadRequest: 400 Error while reading data, error message: Failed to parse JSON: No object found when new array is started.; BeginArray returned false; Parser terminated before end of string File: schema_file.json

你能帮我解决这个问题吗?先谢了。

x0fgdtte

x0fgdtte1#

我看到两个问题:

  • 您的BigQuery表架构不正确,description列的类型为INTEGER而不是STRING。您必须将其设置为STRING
  • 您正在使用旧的Airflow版本。在最近的版本中,通常默认情况下源对象从bucket参数中指定的存储桶中检索对象。对于旧版本,我不确定此行为。您可以设置完整路径以检查它是否解决了您的问题,例如:schema_object='gs://test-bucket/schema.json'

相关问题