使用python | nestedjson和动态字段运行spark作业时出错

ao218c7q  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(192)

我在AmazonS3上有一个inputnestedjson。这个json有一个jsonobject列表,每个jsonobject都有一个动态字段“extension”。有时可以是列表,有时可以是Map。我需要忽略此字段并创建与其他字段对应的模式。目前我无法做到这一点,而应用扁平的Dataframe记录,我得到错误。
一旦我得到了正确的数据,我需要将它注入到aws-elastic中,这样它就可以用于查询了。
我的问题-->是否有任何方法可以忽略动态字段,而仅从相关字段创建Dataframe?在jackson中,我们可以在字段上应用@jsonignore,以便在序列化/反序列化时不读取这些字段。
我试着只使用3个字段创建一个新的Dataframe,但得到的结果是一行

ndf = df.select("Records.LEI", "Records.Entity","Records.Registration").show(truncate = False)

结果:


|LEI                                             |Entity|Registration                                                                                                                                                                                                                                                                                                       |

|[[001GPB6A9XPE8XJICC14], [004L5FPTUREIWK9T2N63]]|[[[FUND], [ACTIVE], [, [Boston], [US], [245 Summer Street], [02210], [US-MA]], [, [BOSTON], [US], [245 SUMMER STREET], [02110], [US-MA]], [[8888], [OTHER]], [US-MA], [FIDELITY ADVISOR SERIES I - Fidelity Advisor Leveraged Company Stock Fund], [[S000005113], [RA000665]]], [, [ACTIVE], [[888 7th Avenue], [New York], [US], [22nd Floor], [10106], [US-NY]], [[[2711 Centerville Road], [Suite 400]], [Wilmington], [US], [C/O Corporation Service Company], [19808], [US-DE]], [[T91T], [LIMITED PARTNERSHIP]], [US-DE], [Hutchin Hill Capital, LP], [[4386463], [RA000602]]]]|[[[2012-11-29 22:03:00], [2020-06-03 20:03:00], [EVK05KS7XY1DEII3R011], [2021-05-29 13:20:00], [ISSUED], [[S000005113], [RA000665]], [FULLY_CORROBORATED]], [[2012-06-06 21:26:00], [2020-07-17 18:10:00], [EVK05KS7XY1DEII3R011], [2018-05-08 19:16:00], [LAPSED], [[4386463], [RA000602]], [FULLY_CORROBORATED]]]|


完整代码段-->

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions import flatten
from pyspark.sql import SQLContext

appName = "PySpark - JSON file to Spark Data Frame"
master = "local"

# Initialize contexts and session

path = "C:\\spark\\lei_OrigData.txt"

spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Log starting time

dt_start = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("Start time:", dt_start)

# Create a schema for the dataframe

df = spark.read.json(path, multiLine=True)
df.select(flatten(df.records)).show(truncate=False)

# Below line while trying to create a new dataFrame using 3 fields only but the result I got was a single ROW

# ndf = df.select("Records.LEI","Records.Entity","Records.Registration").show(truncate = False)

# Log end time

dt_end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

print("End time:", dt_end)

运行上述代码后,出现以下错误。

pyspark.sql.utils.AnalysisException: cannot resolve 'flatten(`records`)' due to data type mismatch: The argument should be an array of arrays, but '`records`' is of array<struct<Entity:struct<EntityCategory:struct<$:string>,EntityStatus:struct<$:string>,HeadquartersAddress:struct<AdditionalAddressLine:struct<$:string>,City:struct<$:string>,Country:struct<$:string>,FirstAddressLine:struct<$:string>,PostalCode:struct<$:string>,Region:struct<$:string>>,LegalAddress:struct<AdditionalAddressLine:array<struct<$:string>>,City:struct<$:string>,Country:struct<$:string>,FirstAddressLine:struct<$:string>,PostalCode:struct<$:string>,Region:struct<$:string>>,LegalForm:struct<EntityLegalFormCode:struct<$:string>,OtherLegalForm:struct<$:string>>,LegalJurisdiction:struct<$:string>,LegalName:struct<$:string>,RegistrationAuthority:struct<RegistrationAuthorityEntityID:struct<$:string>,RegistrationAuthorityID:struct<$:string>>>,Extension:struct<gleif:Geocoding:string>,LEI:struct<$:string>,Registration:struct<InitialRegistrationDate:struct<$:timestamp>,LastUpdateDate:struct<$:timestamp>,ManagingLOU:struct<$:string>,NextRenewalDate:struct<$:timestamp>,RegistrationStatus:struct<$:string>,ValidationAuthority:struct<ValidationAuthorityEntityID:struct<$:string>,ValidationAuthorityID:struct<$:string>>,ValidationSources:struct<$:string>>>> type.;;                                   'Project [flatten(records#0) AS flatten(records)#2]

输入json-->

{
  "records": [
    {
      "LEI": {
        "$": "001GPB6A9XPE8XJICC14"
      },
      "Entity": {
        "LegalName": {
          "$": "FIDELITY ADVISOR SERIES I - Fidelity Advisor Leveraged Company Stock Fund"
        },
        "LegalAddress": {
          "FirstAddressLine": {
            "$": "245 SUMMER STREET"
          },
          "City": {
            "$": "BOSTON"
          },
          "Region": {
            "$": "US-MA"
          },
          "Country": {
            "$": "US"
          },
          "PostalCode": {
            "$": "02110"
          }
        },
        "HeadquartersAddress": {
          "FirstAddressLine": {
            "$": "245 Summer Street"
          },
          "City": {
            "$": "Boston"
          },
          "Region": {
            "$": "US-MA"
          },
          "Country": {
            "$": "US"
          },
          "PostalCode": {
            "$": "02210"
          }
        },
        "RegistrationAuthority": {
          "RegistrationAuthorityID": {
            "$": "RA000665"
          },
          "RegistrationAuthorityEntityID": {
            "$": "S000005113"
          }
        },
        "LegalJurisdiction": {
          "$": "US-MA"
        },
        "EntityCategory": {
          "$": "FUND"
        },
        "LegalForm": {
          "EntityLegalFormCode": {
            "$": "8888"
          },
          "OtherLegalForm": {
            "$": "OTHER"
          }
        },
        "EntityStatus": {
          "$": "ACTIVE"
        }
      },
      "Registration": {
        "InitialRegistrationDate": {
          "$": "2012-11-29T16:33:00.000Z"
        },
        "LastUpdateDate": {
          "$": "2020-06-03T14:33:00.000Z"
        },
        "RegistrationStatus": {
          "$": "ISSUED"
        },
        "NextRenewalDate": {
          "$": "2021-05-29T07:50:00.000Z"
        },
        "ManagingLOU": {
          "$": "EVK05KS7XY1DEII3R011"
        },
        "ValidationSources": {
          "$": "FULLY_CORROBORATED"
        },
        "ValidationAuthority": {
          "ValidationAuthorityID": {
            "$": "RA000665"
          },
          "ValidationAuthorityEntityID": {
            "$": "S000005113"
          }
        }
      },
      "Extension": {
        "gleif:Geocoding": {
          "gleif:original_address": {
            "$": "245 Summer Street, 02210, Boston, US-MA, US"
          },
          "gleif:relevance": {
            "$": "0.92"
          },
          "gleif:match_type": {
            "$": "pointAddress"
          },
          "gleif:lat": {
            "$": "42.3514"
          },
          "gleif:lng": {
            "$": "-71.05385"
          },
          "gleif:geocoding_date": {
            "$": "2017-10-23T19:14:11"
          },
          "gleif:bounding_box": {
            "$": "TopLeft.Latitude: 42.3525242, TopLeft.Longitude: -71.0553711, BottomRight.Latitude: 42.3502758, BottomRight.Longitude: -71.0523289"
          },
          "gleif:match_level": {
            "$": "houseNumber"
          },
          "gleif:formatted_address": {
            "$": "245 Summer St, Boston, MA 02210, United States"
          },
          "gleif:mapped_location_id": {
            "$": "NT_PYMT6GOD3rrAC9q2Al5jZB_yQTN"
          },
          "gleif:mapped_street": {
            "$": "Summer St"
          },
          "gleif:mapped_housenumber": {
            "$": "245"
          },
          "gleif:mapped_postalcode": {
            "$": "02210"
          },
          "gleif:mapped_city": {
            "$": "Boston"
          },
          "gleif:mapped_district": {
            "$": "Downtown Boston"
          },
          "gleif:mapped_state": {
            "$": "MA"
          },
          "gleif:mapped_country": {
            "$": "USA"
          }
        }
      }
    },
    {
      "LEI": {
        "$": "004L5FPTUREIWK9T2N63"
      },
      "Entity": {
        "LegalName": {
          "$": "Hutchin Hill Capital, LP"
        },
        "LegalAddress": {
          "FirstAddressLine": {
            "$": "C/O Corporation Service Company"
          },
          "AdditionalAddressLine": [
            {
              "$": "2711 Centerville Road"
            },
            {
              "$": "Suite 400"
            }
          ],
          "City": {
            "$": "Wilmington"
          },
          "Region": {
            "$": "US-DE"
          },
          "Country": {
            "$": "US"
          },
          "PostalCode": {
            "$": "19808"
          }
        },
        "HeadquartersAddress": {
          "FirstAddressLine": {
            "$": "22nd Floor"
          },
          "AdditionalAddressLine": {
            "$": "888 7th Avenue"
          },
          "City": {
            "$": "New York"
          },
          "Region": {
            "$": "US-NY"
          },
          "Country": {
            "$": "US"
          },
          "PostalCode": {
            "$": "10106"
          }
        },
        "RegistrationAuthority": {
          "RegistrationAuthorityID": {
            "$": "RA000602"
          },
          "RegistrationAuthorityEntityID": {
            "$": "4386463"
          }
        },
        "LegalJurisdiction": {
          "$": "US-DE"
        },
        "LegalForm": {
          "EntityLegalFormCode": {
            "$": "T91T"
          },
          "OtherLegalForm": {
            "$": "LIMITED PARTNERSHIP"
          }
        },
        "EntityStatus": {
          "$": "ACTIVE"
        }
      },
      "Registration": {
        "InitialRegistrationDate": {
          "$": "2012-06-06T15:56:00.000Z"
        },
        "LastUpdateDate": {
          "$": "2020-07-17T12:40:00.000Z"
        },
        "RegistrationStatus": {
          "$": "LAPSED"
        },
        "NextRenewalDate": {
          "$": "2018-05-08T13:46:00.000Z"
        },
        "ManagingLOU": {
          "$": "EVK05KS7XY1DEII3R011"
        },
        "ValidationSources": {
          "$": "FULLY_CORROBORATED"
        },
        "ValidationAuthority": {
          "ValidationAuthorityID": {
            "$": "RA000602"
          },
          "ValidationAuthorityEntityID": {
            "$": "4386463"
          }
        }
      },
      "Extension": {
        "gleif:Geocoding": [
          {
            "gleif:original_address": {
              "$": "22nd Floor, 888 7th Avenue, 10106, New York, US-NY, US"
            },
            "gleif:relevance": {
              "$": "0.94"
            },
            "gleif:match_type": {
              "$": "pointAddress"
            },
            "gleif:lat": {
              "$": "40.76537"
            },
            "gleif:lng": {
              "$": "-73.98088"
            },
            "gleif:geocoding_date": {
              "$": "2017-10-25T06:53:52"
            },
            "gleif:bounding_box": {
              "$": "TopLeft.Latitude: 40.7664942, TopLeft.Longitude: -73.9823642, BottomRight.Latitude: 40.7642458, BottomRight.Longitude: -73.9793958"
            },
            "gleif:match_level": {
              "$": "houseNumber"
            },
            "gleif:formatted_address": {
              "$": "888 7th Ave, New York, NY 10106, United States"
            },
            "gleif:mapped_location_id": {
              "$": "NT_42almrnte4m8ALt9ONHN2C_4gDO"
            },
            "gleif:mapped_street": {
              "$": "7th Ave"
            },
            "gleif:mapped_housenumber": {
              "$": "888"
            },
            "gleif:mapped_postalcode": {
              "$": "10106"
            },
            "gleif:mapped_city": {
              "$": "New York"
            },
            "gleif:mapped_district": {
              "$": "Clinton"
            },
            "gleif:mapped_state": {
              "$": "NY"
            },
            "gleif:mapped_country": {
              "$": "USA"
            }
          },
          {
            "gleif:original_address": {
              "$": "C/O Corporation Service Company, 2711 Centerville Road, Suite 400, null, US, US-DE, 19808, Wilmington"
            },
            "gleif:relevance": {
              "$": "0.93"
            },
            "gleif:match_type": {
              "$": "pointAddress"
            },
            "gleif:lat": {
              "$": "39.75411"
            },
            "gleif:lng": {
              "$": "-75.62652"
            },
            "gleif:geocoding_date": {
              "$": "2016-08-16T03:54:45"
            },
            "gleif:bounding_box": {
              "$": "TopLeft.Latitude: 39.7552342, TopLeft.Longitude: -75.6279822, BottomRight.Latitude: 39.7529858, BottomRight.Longitude: -75.6250578"
            },
            "gleif:match_level": {
              "$": "houseNumber"
            },
            "gleif:formatted_address": {
              "$": "2711 Centerville Rd, Wilmington, DE 19808, United States"
            },
            "gleif:mapped_location_id": {
              "$": "NT_8wi0yH62lxXql.LtXORq-C_ycTMxA"
            },
            "gleif:mapped_street": {
              "$": "Centerville Rd"
            },
            "gleif:mapped_housenumber": {
              "$": "2711"
            },
            "gleif:mapped_postalcode": {
              "$": "19808"
            },
            "gleif:mapped_city": {
              "$": "Wilmington"
            },
            "gleif:mapped_state": {
              "$": "DE"
            },
            "gleif:mapped_country": {
              "$": "USA"
            }
          }
        ]
      }
    }

]
}
```[enter image description here][1]

  [1]: https://i.stack.imgur.com/ajOJM.png

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题