我有一个csv文件列表,我想复制行并按顺序将它们推送到BQ。目前,我使用panda读取csv文件,并使用to_gbq方法在bigquery中获取数据。但是,由于文件很大(每个文件只有几GB),我想以批处理模式接收数据,以避免任何内存错误。
to_gbq
ddrv8njm1#
我写了下面的更新函数,bq-client似乎比to_gbq快。
bq-client
from google.cloud import bigquery import pandas from tqdm import tqdm import warnings warnings.simplefilter("always", category=PendingDeprecationWarning) warnings.simplefilter("always", category=DeprecationWarning) def df_to_bq(df, table_id, table_schema, batch_size = None): client = bigquery.Client(project = 'high-theme-12435') job_config = bigquery.LoadJobConfig(schema=table_schema, source_format=bigquery.SourceFormat.CSV) if batch_size == None: job = client.load_table_from_dataframe( df, table_id, job_config=job_config ) else: for (batch_no, i) in tqdm(enumerate(range(0, len(df), batch_size))): batch_df = df.iloc[i: i+batch_size] job = client.load_table_from_dataframe(batch_df, table_id, job_config=job_config) print(f"### DUMP to BQ done for batch {batch_no}. ({i} to {i+len(batch_df)}.) ###")
table_schema = [ bigquery.SchemaField("col1", "INTEGER"), bigquery.SchemaField("col2", "STRING"), bigquery.SchemaField("col3", "TIMESTAMP"), bigquery.SchemaField("col4", "FLOAT"), ]
import pandas as pd import datetime as dt from dateutil import parser df = pd.read_csv('test.csv') def from_iso_date(date_str): if not date_str: return None return parser.parse(date_str) df['timecol'] = pd.to_datetime('now') df['col3'] = df['col3'].apply(from_iso_date) table_id = 'high-theme-12435.test.test_table' df_to_bq(df, table_id, table_schema, batch_size = 1000)
1条答案
按热度按时间ddrv8njm1#
我写了下面的更新函数,
bq-client
似乎比to_gbq
快。