我尝试将同一数据集中的一个BigQuery表复制到另一个表,使用www.example.com上的示例https://cloud.google.com/bigquery/docs/managing-tables#copy-table
我创建了一个函数来执行复制,如下所示:
def copy_table (source_table,dest_table):
client = bigquery.Client()
source_table_ref="my_project.my_dataset."+source_table
dest_table_ref="my_project.my_dataset."+dest_table
job = client.copy_table(
source_table_ref,
dest_table_ref) # API request
job.result()
但是当我进行复制时,创建了dest_table,其模式与source_table相同,但是没有任何数据从source_table复制到dest_table。
以下是我正在做的事情的整体顺序:
1.创建source_table
1.在source_table中插入行
1.执行查询以检查行是否在source_table中(它们是-- SELECT COUNT()返回正确的行数)
1.使用上面的函数将source_table复制到dest_table
1.执行查询以检查行是否在dest_table中(它们不是-- SELECT COUNT()返回零行)
我的猜测是,这些作业以某种方式异步执行,但我不知道如何使它们同步执行。任何想法将不胜感激。
如果有帮助的话,我的总体目标是创建一个新表,并使用每日批处理作业的结果(例如get_user_info_2020-06-27)填充,然后将其复制到始终包含当天用户信息的get_user_info_current表。
编辑:
更多信息,基于测试:
在https://cloud.google.com/bigquery/streaming-data-into-bigquery#datavailability页面上,它说:“数据最多需要90分钟才能用于拷贝操作”。所以,我在insert语句之后编写了一个小东西,以等待它完成:
def insert_table_wait(table_name,prev_rows,rows_inserted):
client = bigquery.Client()
table_id = "pacs-user-analysis-dev.google_users."+table_name
table = client.get_table(table_id) # Make an API request.
#wait until the insert fully completes
curr_table=client.get_table(table_id)
sys.stderr.write(str(datetime.datetime.now()) +" "+table_name +" properties: "+str(curr_table._properties)+"\n")
curr_rows=int(curr_table._properties.get('numRows'))
while curr_table._properties.get('streamingBuffer') is not None or curr_rows != prev_rows+rows_inserted:
sys.stderr.write(str(datetime.datetime.now()) +" Waiting for insert into "+str(curr_table._properties.get('id'))+" to complete. StreamingBuffer details: "+str(curr_table._properties.get('streamingBuffer'))+" prev_rows: "+str(prev_rows)+" curr_rows: "+str(curr_rows)+ " should be: " + str(prev_rows+rows_inserted)+"\n")
time.sleep(10)
curr_table=client.get_table(table_id)
curr_rows=int(curr_table._properties.get('numRows') )
我希望这能解决问题我不明白的是,新行几乎立即出现在BigQuery控制台UI中,但table._properties.get('numRows')似乎没有及时更新。
2条答案
按热度按时间inb24sb21#
由于流缓冲区中的数据有限制i
copy jobs
,我建议您使用query job
并设置目标表,就像下面的代码一样:如果对你有帮助请告诉我
lp0sw83n2#
如果有人在2023年遇到这个问题(是的,仍然是一个问题,也适用于
google-cloud-bigquery==3.9.0
):@rmesteves的回复还是要走的路。如果你要复制到一个现有的表,那么不要忘记将write_disposition= bigquery.WriteDisposition.WRITE_TRUNCATE
添加到QueryJobConfig中: