csv 如何将from_dataframe的行为与from_file的行为匹配,以用于具有缺失列的数据的bigquery加载作业?

8tntrjer  于 2023-05-11  发布在  其他
关注(0)|答案(1)|浏览(111)

如果我在bigquery中创建一个空表,其中一个列具有默认值,当我从_dataframe加载缺少列的数据时,默认值会被满足,但当我尝试从_file执行相同操作时,我会得到一个BadRequest错误。
如何使用load_table_from_file实现相同的行为?

实施例

# Create an empty table with a column which has a default value

dataset_id = 'test'
table_id = 'test'

cols = ['source', 'ts']
dataframe = pd.DataFrame(columns=cols)

bq_client = bigquery.Client()

schema=[
        bigquery.SchemaField('source', 'STRING', 'NULLABLE'), 
        bigquery.SchemaField('ts', 'TIMESTAMP', 'NULLABLE', default_value_expression='CURRENT_TIMESTAMP()') 
    ]

dataset_ref = bq_client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)

job_config = bigquery.LoadJobConfig(
    schema=schema
)

job = bq_client.load_table_from_dataframe(
    dataframe, table_ref, job_config=job_config
)
job.result()

从dataframe加载

dataframe = pd.DataFrame({'source':['dataframe']})

job_config = bigquery.LoadJobConfig(
    write_disposition='WRITE_APPEND'
)

job = bq_client.load_table_from_dataframe(
    dataframe, table_ref, job_config=job_config
)

job.result()

结果:

从csv加载表

dataframe = pd.DataFrame({'source':['csv']})

job_config = bigquery.LoadJobConfig(
    source_format = 'CSV',
    skip_leading_rows = 1
)

dataframe.to_csv('temp.csv', index=False)
with open('temp.csv', "rb") as source_file:
    job = bq_client.load_table_from_file(
        source_file, table_ref, job_config=job_config
    )

job.result()

结果:

BadRequest: 400 Error while reading data, error message: CSV table references column position 1, but line starting at position:0 contains only 1 columns.

如何将from_dataframe的行为与from_file匹配?对于上下文,我想使用此方法创建一个updated_at字段,其中包含行的摄取时间。

更新

如果将allow_jagged_rows = True添加到csv的作业配置中,则加载数据,但结果在ts列中为NULL,而不是该列的默认值。

pb3skfrl

pb3skfrl1#

您可以考虑以下代码将CSV数据从Cloud Storage加载到表中。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))

要导入本地文件,请参考此code
有关详细信息,请参阅此文档。

相关问题