python 将BigQuery数据导出为CSV,而无需使用Google Cloud Storage

zengzsys  于 2023-04-10  发布在  Python
关注(0)|答案(8)|浏览(222)

我目前正在写一个软件,用于导出大量的BigQuery数据,并将查询结果以CSV文件的形式存储在本地。我使用的是Python 3和google提供的客户端。我做了配置和认证,但问题是,我不能将数据存储在本地。每次执行时,我都会得到以下错误消息
googleapiclient.errors.HttpError:https://www.googleapis.com/bigquery/v2/projects/round-office-769/jobs?alt=json返回“无效的提取目标URI 'response/file-name-*. csv'。必须是有效的Google存储路径。"〉

这是我的作业配置:

def export_table(service, cloud_storage_path,
             projectId, datasetId, tableId, sqlQuery,
             export_format="CSV",
             num_retries=5):

# Generate a unique job_id so retries
# don't accidentally duplicate export
job_data = {
    'jobReference': {
        'projectId': projectId,
        'jobId': str(uuid.uuid4())
    },
    'configuration': {
        'extract': {
            'sourceTable': {
                'projectId': projectId,
                'datasetId': datasetId,
                'tableId': tableId,
            },
            'destinationUris': ['response/file-name-*.csv'],
            'destinationFormat': export_format
        },
        'query': {
            'query': sqlQuery,
        }
    }
}
return service.jobs().insert(
    projectId=projectId,
    body=job_data).execute(num_retries=num_retries)

我希望我可以只使用本地路径而不是云存储来存储数据,但我错了。

我的问题是:

我可以在本地下载查询的数据(或下载到本地数据库),还是必须使用Google Cloud Storage?

qlvxas9a

qlvxas9a1#

您需要使用Google Cloud Storage进行导出工作。从BigQuery导出数据的解释是here,还可以检查不同路径语法的变体。
然后,您可以将文件从GCS下载到本地存储。
Gsutil工具可以帮助您进一步从GCS下载文件到本地机器。
你不能在本地一次下载,你首先需要导出到GCS,然后再转移到本地机器。

sbtkgmzw

sbtkgmzw2#

您可以使用分页机制直接下载所有数据(无需通过Google Cloud Storage路由)。基本上,您需要为每个页面生成一个页面令牌,下载页面中的数据并迭代此操作,直到所有数据都已下载完毕,即没有更多令牌可用。下面是Java中的示例代码,希望能澄清这个想法:

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.*;

/* your class starts here */

private String projectId = ""; /* fill in the project id here */
private String query = ""; /* enter your query here */
private Bigquery bigQuery;
private Job insert;
private TableDataList tableDataList;
private Iterator<TableRow> rowsIterator;
private List<TableRow> rows;
private long maxResults = 100000L; /* max number of rows in a page */

/* run query */
public void open() throws Exception {
    HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
    JsonFactory jsonFactory = new JacksonFactory();
    GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
    if (credential.createScopedRequired())
        credential = credential.createScoped(BigqueryScopes.all());
    bigQuery = new Bigquery.Builder(transport, jsonFactory, credential).setApplicationName("my app").build();

    JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
    JobConfiguration jobConfig = new JobConfiguration().setQuery(queryConfig);
    Job job = new Job().setConfiguration(jobConfig);
    insert = bigQuery.jobs().insert(projectId, job).execute();
    JobReference jobReference = insert.getJobReference();

    while (true) {
        Job poll = bigQuery.jobs().get(projectId, jobReference.getJobId()).execute();
        String state = poll.getStatus().getState();
        if ("DONE".equals(state)) {
            ErrorProto errorResult = poll.getStatus().getErrorResult();
            if (errorResult != null)
                throw new Exception("Error running job: " + poll.getStatus().getErrors().get(0));
            break;
        }
        Thread.sleep(10000);
    }

    tableDataList = getPage();
    rows = tableDataList.getRows();
    rowsIterator = rows != null ? rows.iterator() : null;
}

/* read data row by row */
public /* your data object here */ read() throws Exception {
    if (rowsIterator == null) return null;

    if (!rowsIterator.hasNext()) {
        String pageToken = tableDataList.getPageToken();
        if (pageToken == null) return null;
        tableDataList = getPage(pageToken);
        rows = tableDataList.getRows();
        if (rows == null) return null;
        rowsIterator = rows.iterator();
    }

    TableRow row = rowsIterator.next();
    for (TableCell cell : row.getF()) {
        Object value = cell.getV();
        /* extract the data here */
    }

    /* return the data */
}

private TableDataList getPage() throws IOException {
    return getPage(null);
}

private TableDataList getPage(String pageToken) throws IOException {
    TableReference sourceTable = insert
            .getConfiguration()
            .getQuery()
            .getDestinationTable();
    if (sourceTable == null)
        throw new IllegalArgumentException("Source table not available. Please check the query syntax.");
    return bigQuery.tabledata()
            .list(projectId, sourceTable.getDatasetId(), sourceTable.getTableId())
            .setPageToken(pageToken)
            .setMaxResults(maxResults)
            .execute();
}
mjqavswn

mjqavswn3#

使用Python panda将数据从BigQuery表导出到CSV文件:

import pandas as pd
from google.cloud import bigquery

selectQuery = """SELECT * FROM dataset-name.table-name"""
bigqueryClient = bigquery.Client()
df = bigqueryClient.query(selectQuery).to_dataframe()
df.to_csv("file-name.csv", index=False)
omqzjyyz

omqzjyyz4#

您可以对该表运行tabledata.list()操作,并设置“alt=csv”,这将以CSV形式返回表的开头。

bwntbbo3

bwntbbo35#

如果你安装了Google BigQuery API、panda和pandas.io,你就可以在Jupyter笔记本里运行Python,查询BQ表,然后把数据放到本地的dataframe里,再从那里把数据写成CSV格式。

gorkyyrv

gorkyyrv6#

另一种方法是从UI执行此操作,一旦返回查询结果,您可以选择“Download as CSV”按钮。

axkjgtzd

axkjgtzd7#

就像Mikhail Berlyant说的,
BigQuery不提供将查询结果直接导出/下载到GCS或本地文件的功能。
您仍然可以使用WebUI导出它,只需三个步骤
1.配置查询以将结果保存在BigQuery表中并运行它。
1.将表导出到GCS中的存储桶。
1.从存储桶下载。
要确保成本保持在较低水平,只需确保在将内容导出到GCS后删除表,并在将文件下载到计算机后删除存储桶中的内容和存储桶。

第一步

在BigQuery屏幕中,在运行查询之前,转到更多〉查询设置

这将打开以下内容

这里你想要有

  • 目的地:设置查询结果的目标表
  • 项目名称:选择项目。
  • 数据集名称:选择一个数据集。如果没有,请创建它并返回。
  • 表名:给予您想要的任何名称(必须仅包含字母、数字或下划线)。
  • 结果大小:允许大型结果(无大小限制)。

然后保存它,查询被配置为保存在一个特定的表中。现在你可以运行查询。

第二步

要将其导出到GCP,您必须转到该表并单击“导出”〉“导出到GCS”。

这将打开以下屏幕

在【选择GCS位置】中,定义存储桶、文件夹和文件。
例如,您有一个名为 daria_bucket 的存储桶(* 仅使用小写字母、数字、连字符(-)和下划线(_)。点(.)可用于形成有效的域名。*),并且希望将文件保存在名称为 test 的存储桶根目录中,然后您(在Select GCS location中)

daria_bucket/test.csv

如果文件太大(超过1GB),你会得到一个错误。要修复它,你必须使用通配符将它保存在更多的文件中。所以,你需要添加 *,就像这样

daria_bucket/test*.csv

这将在daria_bucket中存储从表中提取的所有数据,这些数据将存储在多个文件中,这些文件名为test 000000000000,test 000000000001,test 00000000002,... testX。

第三步

然后去仓库,你会看到水桶。

进入它里面,你会发现一个(或多个)文件。然后你可以从那里下载。

6ju8rftf

6ju8rftf8#

也许你可以使用谷歌提供的simba odbc驱动程序,使用任何提供odbc连接的工具来创建csv。它甚至可以是微软的ssis,你甚至不需要编码。

相关问题