无法将具有JSON/RECORD列类型的bigquery表读取到spark Dataframe 中,(java. lang. IllegalStateException:意外类型:JSON格式)

xdyibdwo  于 2023-01-06  发布在  Spark
关注(0)|答案(1)|浏览(85)

我们正在尝试从Bigquery读取一个表到Spark Dataframe 。
表的结构为

以下pyspark代码用于读取数据。

from google.oauth2 import service_account
    from google.cloud import bigquery
    import json
    import base64 as bs
    from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, DecimalType
    
    schema = "schema_name"
    project_id = "project_id"
    
    table_name = "simple"
    # table_name = "jsonres"
    schema_table_name = str(project_id) + "." + str(schema) + "." + str(table_name)
    credentials_dict = {"Insert_actual_credentials": "here"}

    credentials = service_account.Credentials.from_service_account_info(credentials_dict)
    client = bigquery.Client(credentials=credentials, project=project_id)
    
    query = "SELECT * FROM `{}`;".format(schema_table_name)
    # print(query)
    query_job = client.query(query)
    query_job.result()
    
    s = json.dumps(credentials_dict)
    res = bs.b64encode(s.encode('utf-8'))
    ans = res.decode("utf-8")
    
    try:
        df = spark.read.format('bigquery') \
            .option("credentials", ans) \
            .option("parentProject", project_id) \
            .option("project", project_id) \
            .option("mode", "DROPMALFORMED") \
            .option('dataset', query_job.destination.dataset_id) \
            .load(query_job.destination.table_id)
        df.printSchema()
        print(df)
        df.show()
    except Exception as exp:
        print(exp)

对于简单的表,我们能够成功地将表读为 Dataframe 。
但是当我们在下面给出的大查询表中有json列时,我们会得到错误。

我们收到以下错误。
调用o1138.load时出错。错误:java.lang.IllegalStateException:意外类型:JSON,网址为:谷歌云计算,Spark,大查询,架构转换器,获取标准数据类型,网址为:谷歌云计算,Spark,大查询,架构转换器,获取数据类型,网址为:
我们还尝试在读取数据时提供模式。

structureSchema = StructType([ \
        StructField('x', StructType([
             StructField('name', StringType(), True)
             ])),
    StructField("y", DecimalType(), True) \
  ])
print(structureSchema)

try:
    df = spark.read.format('bigquery') \
        .option("credentials", ans) \
        .option("parentProject", project_id) \
        .option("project", project_id) \
        .option("mode", "DROPMALFORMED") \
        .option('dataset', query_job.destination.dataset_id) \
        .schema(structureSchema) \
        .load(query_job.destination.table_id)
    df.printSchema()
    print(df)
    df.show()
except Exception as exp:
    print(exp)

我们仍然遇到了同样的错误"java.lang.IllegalStateException:意外类型:杰森
如何将json类型的bigquery表读入spark数据框?
更新1:在github中有一个关于这个问题的公开问题。
While reading a bigquery table, having a JSON type field from Apache Spark throws exception.
是否有任何解决方法?

tyu7yeag

tyu7yeag1#

尝试下面的代码并检查它是否适合您。基本上,您将JSON列保留为字符串,然后可以使用spark函数获取JSON内容

import pyspark.sql.functions as f

structureSchema = StructType([
    StructField('x', StringType()),
    StructField("y", DecimalType())
  ])

df = (spark.read.format('bigquery')
        .option("credentials", ans)
        .option("parentProject", project_id)
        .option("project", project_id)
        .option("mode", "DROPMALFORMED")
        .option('dataset', query_job.destination.dataset_id)
        .schema(structureSchema)
        .load(query_job.destination.table_id)
     )

df = df.withColumn("jsonColumnName", f.get_json_object(f.col("x"), "$.name"))

相关问题