在glue中对数组使用relationalize时遇到问题

ecr0jaav  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(299)

我一直在尝试将一个表关联起来,这个表是我在json上运行的爬虫用胶水制作的。表的架构如下所示

Column name  Data type
1   itemdetails array

2   scannableid string

3   attemptid string

4   packagedetails struct

5   warehouseid string

6   application string

7   shipmentid bigint

8   timestamp bigint

然后我有一个运行以下脚本的作业

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
glue_temp_storage = "s3://sqs-test/temp"
dfc_root_table_name = "root" 
tempDir = "s3://sqs-test/temp"

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "booker", table_name = "json_firehose_glue2020", transformation_ctx = "datasource0")
dfc = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")
data = dfc.select(dfc_root_table_name)
dataoutput = glueContext.write_dynamic_frame.from_options(frame = data, connection_type = "s3", connection_options = {"path": 's3://sqs-test/output-flat'}, format = "parquet", transformation_ctx = "dataoutput")

job.commit()

然而,这给了我一个Parquet文件,当crawled返回一个表,该表将packagedetails结构关联起来,但返回一个大int形式的itemdetails。在研究了这个之后,我发现应该为我的itemdetails创建一个单独的表,作为root\u itemdetails。因此,我尝试了下面的作业将其与根表连接起来。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
glue_temp_storage = "s3://sqs-test/temp"
dfc_root_table_name = "root" 
tempDir = "s3://sqs-test/temp"

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "booker", table_name = "json_firehose_glue2020", transformation_ctx = "datasource0")
dfc = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")
root_df = dfc.select('root')
itemdetails_df = dfc.select('root_itemdetails')
data = Join.apply(root_df, itemdetails_df, 'itemdetails', 'id')
dataoutput = glueContext.write_dynamic_frame.from_options(frame = data, connection_type = "s3", connection_options = {"path": 's3://sqs-test/output-flat'}, format = "parquet", transformation_ctx = "dataoutput")

job.commit()

我根据以下文章做了这个修改:如何关联包含json的数组。但是,在运行此作业后,出现以下错误:“analysisexception:datasource不支持写入空模式或嵌套的空模式。请确保数据架构至少有一列或多列。“”。我应该对我的工作做哪些更改,以便itemdetails数组也被关联起来。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题