import pandas as pd
from google.cloud import bigquery
bigquery_client = bigquery.Client()
def pii_auto_apply_policies(event, context):
# READING INPUT CSV FILE TEMPLATE DATA:
bucket_name = event['bucket']
object_name = event['name']
print(object_name, bucket_name)
path = "gs://{}/{}".format(bucket_name, object_name)
data = pd.read_csv(path, encoding='utf-8')
print(data.head())
for row in range(0, len(data.table_id)):
table = bigquery_client.get_table(data.table_id[row]) # Make an API request.
original_schema = table.schema
# Creating new schema list to create a new one.
new_schema = list()
# Looping through originial schema to create a new schema list:
for schema_row in original_schema:
# 1st if - appends all other column schemas other than the ones required.
if schema_row.name != data.field_name_in_table[row] or schema_row.field_type != data.data_type_in_table[row]:
new_schema.append(schema_row)
# 2nd if - picks only the required column and appends with policy tag resource id.
if schema_row.name == data.field_name_in_table[row] and schema_row.field_type == data.data_type_in_table[row]:
new_schema.append(bigquery.SchemaField(name=schema_row.name,
field_type=schema_row.field_type,
policy_tags=bigquery.PolicyTagList([data.policy_tag_resource[row]])))
print("Schema length for table: {}, before: {}, after: {}".format(data.table_id[row],
len(original_schema), len(new_schema)))
# Applying the schema to table
table.schema = new_schema
table = bigquery_client.update_table(table, ["schema"])
print("Table updated:{}".format(data.table_id[row]))
return "OK"
2条答案
按热度按时间2ic8powd1#
假设:在使用策略标记更新每个列之前和之后保持列名相同
步骤:
1.使用
get_table()
提取表并迭代其模式(SchemaField列表)1.创建新的SchemaField对象,通过引用SchemaField的名称/描述(来自步骤#1)添加参数值,
policy_tags
除外(类型= PolicyTagList示例:google.cloud.bigquery.SchemaField(name=column.name, field_type=column.field_type, policy_tags=google.cloud.bigquery.PolicyTagList(["projects/some-project/locations/us/taxonomies/123454321/policyTags/180000"]))
1.将这些新的SchemaFields对象追加到列表中,并使用此新列表重新分配表的模式
1.使用
update_table
更新架构,例如:google.cloud.bigquery.Client(project = BIGQUERY_PROJECT).update_table(table, ['schema'])
sirbozc52#
使用@Asad Siddiqui步骤,创建以下代码。
下面将介绍如何使用GCP中的云函数自动更新模式。
工艺流程:上传一个csv文件到GCS bucket中--〉用下面的代码触发云函数&应用csv文件中提到的表的模式。
局限性:这不适用于"记录"类型的列(其中有嵌套字段)。我将更新第二个版本,以防万一,如果我找到了解决方案。