BigQuery Python API copy_table复制模式但不复制数据

r1zhe5dt  于 2023-05-08  发布在  Python
关注(0)|答案(2)|浏览(401)

我尝试将同一数据集中的一个BigQuery表复制到另一个表,使用www.example.com上的示例https://cloud.google.com/bigquery/docs/managing-tables#copy-table
我创建了一个函数来执行复制,如下所示:

def copy_table (source_table,dest_table):
    client = bigquery.Client()
    source_table_ref="my_project.my_dataset."+source_table
    dest_table_ref="my_project.my_dataset."+dest_table

    job = client.copy_table(
        source_table_ref,
        dest_table_ref)  # API request
    job.result()

但是当我进行复制时,创建了dest_table,其模式与source_table相同,但是没有任何数据从source_table复制到dest_table。
以下是我正在做的事情的整体顺序:
1.创建source_table
1.在source_table中插入行
1.执行查询以检查行是否在source_table中(它们是-- SELECT COUNT()返回正确的行数)
1.使用上面的函数将source_table复制到dest_table
1.执行查询以检查行是否在dest_table中(它们不是-- SELECT COUNT(
)返回零行)
我的猜测是,这些作业以某种方式异步执行,但我不知道如何使它们同步执行。任何想法将不胜感激。
如果有帮助的话,我的总体目标是创建一个新表,并使用每日批处理作业的结果(例如get_user_info_2020-06-27)填充,然后将其复制到始终包含当天用户信息的get_user_info_current表。
编辑:
更多信息,基于测试:
在https://cloud.google.com/bigquery/streaming-data-into-bigquery#datavailability页面上,它说:“数据最多需要90分钟才能用于拷贝操作”。所以,我在insert语句之后编写了一个小东西,以等待它完成:

def insert_table_wait(table_name,prev_rows,rows_inserted):
    client = bigquery.Client()
    table_id = "pacs-user-analysis-dev.google_users."+table_name
    table = client.get_table(table_id)  # Make an API request.

    #wait until the insert fully completes
    curr_table=client.get_table(table_id)
    sys.stderr.write(str(datetime.datetime.now()) +" "+table_name +" properties: "+str(curr_table._properties)+"\n")
    curr_rows=int(curr_table._properties.get('numRows'))
    while curr_table._properties.get('streamingBuffer') is not None or curr_rows != prev_rows+rows_inserted:
        sys.stderr.write(str(datetime.datetime.now()) +" Waiting for insert into "+str(curr_table._properties.get('id'))+" to complete. StreamingBuffer details: "+str(curr_table._properties.get('streamingBuffer'))+" prev_rows: "+str(prev_rows)+" curr_rows: "+str(curr_rows)+ " should be: " + str(prev_rows+rows_inserted)+"\n")
        time.sleep(10)
        curr_table=client.get_table(table_id)
        curr_rows=int(curr_table._properties.get('numRows') )

我希望这能解决问题我不明白的是,新行几乎立即出现在BigQuery控制台UI中,但table._properties.get('numRows')似乎没有及时更新。

inb24sb2

inb24sb21#

由于流缓冲区中的数据有限制i copy jobs,我建议您使用query job并设置目标表,就像下面的代码一样:

from google.cloud import bigquery
client = bigquery.Client(project = "your-project")

job_config = bigquery.QueryJobConfig(destination="destination-table-id")
# SELECT * to copy the whole table
sql = "SELECT * FROM <source_table>"
query_job = client.query(sql, job_config=job_config)
query_job.result()

如果对你有帮助请告诉我

lp0sw83n

lp0sw83n2#

如果有人在2023年遇到这个问题(是的,仍然是一个问题,也适用于google-cloud-bigquery==3.9.0):@rmesteves的回复还是要走的路。如果你要复制到一个现有的表,那么不要忘记将write_disposition= bigquery.WriteDisposition.WRITE_TRUNCATE添加到QueryJobConfig中:

job_config = bigquery.QueryJobConfig()
job_config.destination: bigquery.TableReference = "destination"
job_config.write_disposition: str = bigquery.WriteDisposition.WRITE_TRUNCATE

sql = f"SELECT * FROM {source}"
query_job = bq_client.query(query=sql, job_config=job_config)
query_job.result()

相关问题