我们正在尝试从Bigquery读取一个表到Spark Dataframe 。
表的结构为
以下pyspark代码用于读取数据。
from google.oauth2 import service_account
from google.cloud import bigquery
import json
import base64 as bs
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, DecimalType
schema = "schema_name"
project_id = "project_id"
table_name = "simple"
# table_name = "jsonres"
schema_table_name = str(project_id) + "." + str(schema) + "." + str(table_name)
credentials_dict = {"Insert_actual_credentials": "here"}
credentials = service_account.Credentials.from_service_account_info(credentials_dict)
client = bigquery.Client(credentials=credentials, project=project_id)
query = "SELECT * FROM `{}`;".format(schema_table_name)
# print(query)
query_job = client.query(query)
query_job.result()
s = json.dumps(credentials_dict)
res = bs.b64encode(s.encode('utf-8'))
ans = res.decode("utf-8")
try:
df = spark.read.format('bigquery') \
.option("credentials", ans) \
.option("parentProject", project_id) \
.option("project", project_id) \
.option("mode", "DROPMALFORMED") \
.option('dataset', query_job.destination.dataset_id) \
.load(query_job.destination.table_id)
df.printSchema()
print(df)
df.show()
except Exception as exp:
print(exp)
对于简单的表,我们能够成功地将表读为 Dataframe 。
但是当我们在下面给出的大查询表中有json列时,我们会得到错误。
我们收到以下错误。
调用o1138.load时出错。错误:java.lang.IllegalStateException:意外类型:JSON,网址为:谷歌云计算,Spark,大查询,架构转换器,获取标准数据类型,网址为:谷歌云计算,Spark,大查询,架构转换器,获取数据类型,网址为:
我们还尝试在读取数据时提供模式。
structureSchema = StructType([ \
StructField('x', StructType([
StructField('name', StringType(), True)
])),
StructField("y", DecimalType(), True) \
])
print(structureSchema)
try:
df = spark.read.format('bigquery') \
.option("credentials", ans) \
.option("parentProject", project_id) \
.option("project", project_id) \
.option("mode", "DROPMALFORMED") \
.option('dataset', query_job.destination.dataset_id) \
.schema(structureSchema) \
.load(query_job.destination.table_id)
df.printSchema()
print(df)
df.show()
except Exception as exp:
print(exp)
我们仍然遇到了同样的错误"java.lang.IllegalStateException:意外类型:杰森
如何将json类型的bigquery表读入spark数据框?
更新1:在github中有一个关于这个问题的公开问题。
While reading a bigquery table, having a JSON type field from Apache Spark throws exception.
是否有任何解决方法?
1条答案
按热度按时间tyu7yeag1#
尝试下面的代码并检查它是否适合您。基本上,您将JSON列保留为字符串,然后可以使用spark函数获取JSON内容