python 有没有办法将策略标记附加到Bigquery表?

ogsagwnx  于 2023-03-06  发布在  Python
关注(0)|答案(2)|浏览(216)

我一直在尝试使用GCP的Python客户端库将策略标签附加到现有的BigQuery表,但似乎找不到方法。
我知道如何在PolicyTagManager中创建分类和策略标记
但我似乎不能附上政策标签。
我找到了'google.cloud.bigquery.schema.PolicyTagList',但我不确定tableID是在哪里指定的,也不确定如何使用此方法附加策略标记?
提前感谢帮助!

2ic8powd

2ic8powd1#

假设:在使用策略标记更新每个列之前和之后保持列名相同

步骤:
1.使用get_table()提取表并迭代其模式(SchemaField列表)
1.创建新的SchemaField对象,通过引用SchemaField的名称/描述(来自步骤#1)添加参数值,policy_tags除外(类型= PolicyTagList示例:google.cloud.bigquery.SchemaField(name=column.name, field_type=column.field_type, policy_tags=google.cloud.bigquery.PolicyTagList(["projects/some-project/locations/us/taxonomies/123454321/policyTags/180000"]))
1.将这些新的SchemaFields对象追加到列表中,并使用此新列表重新分配表的模式
1.使用update_table更新架构,例如:
google.cloud.bigquery.Client(project = BIGQUERY_PROJECT).update_table(table, ['schema'])

sirbozc5

sirbozc52#

使用@Asad Siddiqui步骤,创建以下代码。
下面将介绍如何使用GCP中的云函数自动更新模式。
工艺流程:上传一个csv文件到GCS bucket中--〉用下面的代码触发云函数&应用csv文件中提到的表的模式。

    • CSV模板格式:**

import pandas as pd
from google.cloud import bigquery
bigquery_client = bigquery.Client()

def pii_auto_apply_policies(event, context):

    # READING INPUT CSV FILE TEMPLATE DATA:
    bucket_name = event['bucket']
    object_name = event['name']
    print(object_name, bucket_name)

    path = "gs://{}/{}".format(bucket_name, object_name)
    data = pd.read_csv(path, encoding='utf-8')
    print(data.head())

    for row in range(0, len(data.table_id)):

        table = bigquery_client.get_table(data.table_id[row])  # Make an API request.
        original_schema = table.schema

        # Creating new schema list to create a new one.
        new_schema = list()

        # Looping through originial schema to create a new schema list:
        for schema_row in original_schema:

            # 1st if - appends all other column schemas other than the ones required.
            if schema_row.name != data.field_name_in_table[row] or schema_row.field_type != data.data_type_in_table[row]:
                new_schema.append(schema_row)
                
            # 2nd if - picks only the required column and appends with policy tag resource id.
            if schema_row.name == data.field_name_in_table[row] and schema_row.field_type == data.data_type_in_table[row]:
                new_schema.append(bigquery.SchemaField(name=schema_row.name,
                                    field_type=schema_row.field_type,
                                    policy_tags=bigquery.PolicyTagList([data.policy_tag_resource[row]])))

        print("Schema length for table: {}, before: {}, after: {}".format(data.table_id[row], 
                                                                        len(original_schema), len(new_schema)))

        # Applying the schema to table
        table.schema = new_schema
        table = bigquery_client.update_table(table, ["schema"])
        print("Table updated:{}".format(data.table_id[row]))

    return "OK"

局限性:这不适用于"记录"类型的列(其中有嵌套字段)。我将更新第二个版本,以防万一,如果我找到了解决方案。

相关问题