pandas 以批处理模式将panda Dataframe (从CSV)写入BigQuery

mnemlml8  于 2023-01-15  发布在  其他
关注(0)|答案(1)|浏览(148)

我有一个csv文件列表,我想复制行并按顺序将它们推送到BQ。目前,我使用panda读取csv文件,并使用to_gbq方法在bigquery中获取数据。但是,由于文件很大(每个文件只有几GB),我想以批处理模式接收数据,以避免任何内存错误。

ddrv8njm

ddrv8njm1#

我写了下面的更新函数,bq-client似乎比to_gbq快。

from google.cloud import bigquery
import pandas
from tqdm import tqdm
import warnings
warnings.simplefilter("always", category=PendingDeprecationWarning) 
warnings.simplefilter("always", category=DeprecationWarning) 

def df_to_bq(df, table_id, table_schema, batch_size = None):
  client = bigquery.Client(project = 'high-theme-12435')
  job_config = bigquery.LoadJobConfig(schema=table_schema, source_format=bigquery.SourceFormat.CSV)

  if batch_size == None:
    job = client.load_table_from_dataframe(
        df, table_id, job_config=job_config
    )
  else:
    for (batch_no, i) in tqdm(enumerate(range(0, len(df), batch_size))):
        batch_df = df.iloc[i: i+batch_size]
        job = client.load_table_from_dataframe(batch_df, table_id, job_config=job_config)
        print(f"### DUMP to BQ done for batch {batch_no}. ({i} to {i+len(batch_df)}.) ###")
table_schema = [
                  bigquery.SchemaField("col1", "INTEGER"),
                  bigquery.SchemaField("col2", "STRING"),
                  bigquery.SchemaField("col3", "TIMESTAMP"),
                  bigquery.SchemaField("col4", "FLOAT"),
                ]
import pandas as pd
import datetime as dt
from dateutil import parser

df = pd.read_csv('test.csv')

def from_iso_date(date_str):
    if not date_str:
        return None
    return parser.parse(date_str)

df['timecol'] = pd.to_datetime('now')
df['col3'] = df['col3'].apply(from_iso_date)

table_id = 'high-theme-12435.test.test_table'
df_to_bq(df, table_id, table_schema, batch_size = 1000)

相关问题