pyspark:在分解数组后选择一个值

xpcnnkqh  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(525)

我是pyspark的新手,尝试解析telecom.value if telecom.system=“fax | phone”,但出现以下错误。我知道filter()会返回一个结构,我正在从中选择一列。调用filter()后如何选择列值?
文件“”,第3行,位于pyspark.sql.utils.analysisexception的raiseŠu中:已解析属性telecomŠ27,operator中的名称Š3、telecomŠ5、地址Š7缺少telecomŠ33!项目“名称”项目“名称”项目“名称”项目“3.3.家庭作为从业人员的家庭作为从业人员的家庭作为从业人员的姓姓”23,姓名“3”23,姓名“3.名称”3.作为从业人员的后缀作为从业人员的姓名“名称”24,名称“3”。给予“0”作为从业人员的家庭作为从业人员的姓名“25”,电信电信27.价值作为电信电信。价值42,电信电信33.价值作为电信。价值作为电信。价值作为电信。价值作为电信。价值作为电信。价值。价值额济普#40,地址#7.state as practivationerState#41]。同名属性出现在操作中:telecom,telecom。请检查是否使用了正确的属性。

root
 |-- resource: struct (nullable = true)
 |    |-- address: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- line: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- postalCode: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- identifier: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- type: struct (nullable = true)
 |    |    |    |    |-- coding: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- name: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- family: string (nullable = true)
 |    |    |    |-- given: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- suffix: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- use: string (nullable = true)
 |    |-- resourceType: string (nullable = true)
 |    |-- telecom: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |-- text: struct (nullable = true)
 |    |    |-- div: string (nullable = true)
 |    |    |-- status: string (nullable = true)

import sys
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

json_file_path = 'C:\\Users\\M\\Documents\\Practitioner.json'
source_df = spark.read.json(json_file_path, multiLine=True)

source_df.printSchema()
output = source_df.select(source_df["resource.name"][0].alias("name"), 
                        source_df["resource.telecom"].alias("telecom"),
                        source_df["resource.address"][0].alias("address"))
output.printSchema()

practitioner = output.select(
    output.name.family.alias("Practitioner_LastName"),
    output.name.suffix.alias("Practitioner_NameSuffix"),
    output.name.given[0].alias("Practitioner_FirstName"),
    output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "phone").telecom.value,
    output.withColumn("telecom", f.explode(f.col("telecom"))).filter(f.col("telecom.system") == "fax").telecom.value,
    output.address.city.alias("PractitionerCity"),
    output.address.line[0].alias("PractitionerAddress_1"), 
    output.address.postalCode.alias("PractitionerZip"), 
    output.address.state.alias("PractitionerState")
)                    

practitioner.printSchema()
practitioner.show()

我的json是: {"resource":{"resourceType":"Practitioner","id":"scm-ambqa1821624401190","text":{"status":"generated","div":""},"identifier":[{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"NPI"}]},"value":"1548206097"},{"use":"official","type":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/v2\/0203","code":"DEA"}]},"value":"HB1548206"}],"name":[{"use":"official","family":"BERNSTEIN","given":["HELENE","B"],"suffix":["MD"]}],"telecom":[{"system":"phone","value":"6106547854","use":"work"},{"system":"email","value":"sachin.belhekar@allscripts.com","use":"work"},{"system":"fax","value":"7106547895","use":"work"}],"address":[{"use":"work","line":["West Street 1","West Street 2"],"city":"Michigan","state":"MI","postalCode":"49036","country":"USA"}]}}

rqcrx0a6

rqcrx0a61#

数据结构有点复杂,所以我将使用自定义项来解析它:

import pyspark.sql.functions as f
import pyspark.sql.types as t

@f.udf(t.StringType())
def phone_parser(row):
    for item in row:
        if item['system'] == 'phone':
            return item['value']

@f.udf(t.StringType())
def fax_parser(row):
    for item in row:
        if item['system'] == 'fax':
            return item['value']

output.select(phone_parser('telecom'), fax_parser('telecom'))

相关问题