pyspark 替换字符串列的多个值

vcudknz3  于 2023-10-15  发布在  Spark
关注(0)|答案(4)|浏览(141)

我是新的Spark和奋斗做到这一点。我在两个 Dataframe (Journey,country_code mappinng)中输入了如下内容,并需要生成另一个 Dataframe ,如预期结果。

旅程:-

| ID|旅程|
| --|--|
| 1 |美国->英国->印度|
| 2 |英国->IN->CH|

国家代码Map:-

| 代码|国家|
| --|--|
| 美国|美国|
| 在|印度|
| 我|马来西亚|
| 英国|联合 Realm |
| CH|中国|

预期输出:-

| ID|旅程|旅行_LongName|
| --|--|--|
| 1 |美国->英国->印度|美国->英国->印度|
| 2 |英国->IN->CH|英国->印度->中国|
国家Map是动态的,旅程的顺序不应该在Journey_LongName字段中改变。如果你们中的任何人解决了这个问题或有想法,请分享您的意见。谢谢Dhana

8fq7wneg

8fq7wneg1#

毕竟这只是一个连接,但棘手的部分是保持秩序。我使用transform来保持索引,以便可以维护顺序。最后,再次使用transform丢弃索引。
在这里,两个帧被称为journeycountry。结果称为df

df = journey.selectExpr(
    'ID', 'journey',
    """explode(
          transform(
              split(journey, '->'),
              (x, i)-> struct(x as Code, i as index)
          )
      ) as countries"""
).select(
    'ID', 'journey', 'countries.*'
).join(
    country,
    'Code'
).drop('Code').groupBy('ID').agg(
    F.first('journey').alias('journey'), 
    F.array_sort(
        F.collect_list(
            F.array('index', 'Country')
        )
    ).alias('journey_long')
).selectExpr(
    'ID', 'journey',
    """concat_ws(
           '->',
           transform(journey_long, x -> x[1])
       ) as journey_long"""
)

df.show(20,0)
+---+----------+------------------------------------+
|ID |journey   |journey_long                        |
+---+----------+------------------------------------+
|1  |US->UK->IN|United States->United kingdom->India|
|2  |UK->IN->CH|United kingdom->India->China        |
+---+----------+------------------------------------+
pgvzfuti

pgvzfuti2#

使用spark-sql:
给定输入-1

val df1=spark.sql(""" with t1 (
select 1 c1 , 'US->UK->IN'  c2 union all
select 2 c1 , 'UK->IN->CH' c2 )
select c1 id, c2 journey from t1 
"""
)
df1.show(false)
df1.createOrReplaceTempView("df1")

+---+----------+
|id |journey   |
+---+----------+
|1  |US->UK->IN|
|2  |UK->IN->CH|
+---+----------+

输入-2:

val df2=spark.sql(""" with t1 (
select 'US' c1 , 'United States'  c2 union all
select 'IN' c1 , 'India'  c2 union all
select 'MY' c1 , 'Malaysia'  c2 union all
select 'UK' c1 , 'United kingdom'  c2 union all
select 'CH' c1 , 'China'  c2
)
select c1 code, c2 country from t1 
"""
)
df2.show(false)
df2.createOrReplaceTempView("df2")

+----+--------------+
|code|country       |
+----+--------------+
|US  |United States |
|IN  |India         |
|MY  |Malaysia      |
|UK  |United kingdom|
|CH  |China         |
+----+--------------+

启用交叉联接

spark.sql(" set spark.sql.crossJoin.enabled=true ")

在spark.sql中使用posexplode、collect_list和array_sort

spark.sql("""
select id, journey, concat_ws('->', array_sort(collect_list((e1,country))).country) result from
(
select id, journey, e1, e2, code, country  from 
(select id, journey, posexplode(split(journey,"->")) (e1,e2) from df1  ) t1 cross join 
(select code, country from df2) t2 
where code=e2 ) t3
group by id, journey
order by ID
""")
.show(false)

+---+----------+------------------------------------+
|id |journey   |result                              |
+---+----------+------------------------------------+
|1  |US->UK->IN|United States->United kingdom->India|
|2  |UK->IN->CH|United kingdom->India->China        |
+---+----------+------------------------------------+
iyr7buue

iyr7buue3#

另一种解决方案是使用嵌套的transform()高阶函数,该函数在spark 2.4及以后版本中可用

df1.show(false)
df1.createOrReplaceTempView("df1")

+---+----------+
|id |journey   |
+---+----------+
|1  |US->UK->IN|
|2  |UK->IN->CH|
+---+----------+

df2.show(false)
df2.createOrReplaceTempView("df2")

+----+--------------+
|code|country       |
+----+--------------+
|US  |United States |
|IN  |India         |
|MY  |Malaysia      |
|UK  |United kingdom|
|CH  |China         |
+----+--------------+

spark.sql("""
select id, journey,  concat_ws('->',flatten(transform( split(journey,"->") ,(x,i) -> transform(s1, (y,j) -> (y[x]) )  )))  result from df1, 
( select collect_list(map(code,country)) s1 from df2 )
""")
.show(false)

+---+----------+------------------------------------+
|id |journey   |result                              |
+---+----------+------------------------------------+
|1  |US->UK->IN|United States->United kingdom->India|
|2  |UK->IN->CH|United kingdom->India->China        |
+---+----------+------------------------------------+

通过移动collect_list以转换输入,可以进一步缩短查询

spark.sql("""
select id, journey, concat_ws('->',flatten(transform( split(journey,"->") ,(x,i) -> transform( (select collect_list(map(code,country)) s1 from df2 ), (y,j) -> (y[x]) )  )))  result from df1 
""")
.show(false)

+---+----------+------------------------------------+
|id |journey   |result                              |
+---+----------+------------------------------------+
|1  |US->UK->IN|United States->United kingdom->India|
|2  |UK->IN->CH|United kingdom->India->China        |
+---+----------+------------------------------------+
sg24os4d

sg24os4d4#

使用aggregate()和transform()HOF:
输入-1:

val df1=spark.sql(""" with t1 (
select 1 c1 , 'US->UK->IN'  c2 union all
select 2 c1 , 'UK->IN->CH' c2 )
select c1 id, c2 journey from t1 
"""
)
df1.show(false)
df1.createOrReplaceTempView("df1")

+---+----------+
|id |journey   |
+---+----------+
|1  |US->UK->IN|
|2  |UK->IN->CH|
+---+----------+

输入-2:

val df2=spark.sql(""" with t1 (
select 'US' c1 , 'United States'  c2 union all
select 'IN' c1 , 'India'  c2 union all
select 'MY' c1 , 'Malaysia'  c2 union all
select 'UK' c1 , 'United kingdom'  c2 union all
select 'CH' c1 , 'China'  c2
)
select c1 code, c2 country from t1 
"""
)

df2.show(false)
df2.createOrReplaceTempView("df2")

+----+--------------+
|code|country       |
+----+--------------+
|US  |United States |
|IN  |India         |
|MY  |Malaysia      |
|UK  |United kingdom|
|CH  |China         |
+----+--------------+

使用HOF

spark.sql("""
 with t1 ( select map(code, country) m1 from df2 ),
      t2 ( select collect_list(m1) m2 from t1 ) ,
      t3 ( select aggregate(m2, cast(map() as map<string,string>), (acc,i) ->  map_concat(acc,i)) m3 from t2 ),
      t4 ( select *, transform( split(journey,'->'), x -> (select m3 from t3)[x] )  j1 from df1 )
      select id, journey, j1, concat_ws('->',j1) j2 from t4 
""").show(false)

+---+----------+--------------------------------------+------------------------------------+
|id |journey   |j1                                    |j2                                  |
+---+----------+--------------------------------------+------------------------------------+
|1  |US->UK->IN|[United States, United kingdom, India]|United States->United kingdom->India|
|2  |UK->IN->CH|[United kingdom, India, China]        |United kingdom->India->China        |
+---+----------+--------------------------------------+------------------------------------+

您可以使用struct()而不是map(),从而将aggregate()替换为map_from_entries()

spark.sql("""
 with t1 ( select struct(code, country) m1 from df2 ),
      t2 ( select collect_list(m1) m2 from t1 ) ,
      t3 ( select map_from_entries(m2) m3 from t2 ),
      t4 ( select *, transform( split(journey,'->'), x -> (select m3 from t3)[x] )  j1 from df1 )
      select id, journey, j1, concat_ws('->',j1) j2 from t4 
""").show(false)

+---+----------+--------------------------------------+------------------------------------+
|id |journey   |j1                                    |j2                                  |
+---+----------+--------------------------------------+------------------------------------+
|1  |US->UK->IN|[United States, United kingdom, India]|United States->United kingdom->India|
|2  |UK->IN->CH|[United kingdom, India, China]        |United kingdom->India->China        |
+---+----------+--------------------------------------+------------------------------------+

相关问题