我是新的Spark和奋斗做到这一点。我在两个 Dataframe (Journey,country_code mappinng)中输入了如下内容,并需要生成另一个 Dataframe ,如预期结果。
旅程:-
| ID|旅程|
| --|--|
| 1 |美国->英国->印度|
| 2 |英国->IN->CH|
国家代码Map:-
| 代码|国家|
| --|--|
| 美国|美国|
| 在|印度|
| 我|马来西亚|
| 英国|联合 Realm |
| CH|中国|
预期输出:-
| ID|旅程|旅行_LongName|
| --|--|--|
| 1 |美国->英国->印度|美国->英国->印度|
| 2 |英国->IN->CH|英国->印度->中国|
国家Map是动态的,旅程的顺序不应该在Journey_LongName字段中改变。如果你们中的任何人解决了这个问题或有想法,请分享您的意见。谢谢Dhana
4条答案
按热度按时间8fq7wneg1#
毕竟这只是一个连接,但棘手的部分是保持秩序。我使用
transform
来保持索引,以便可以维护顺序。最后,再次使用transform
丢弃索引。在这里,两个帧被称为
journey
和country
。结果称为df
。pgvzfuti2#
使用spark-sql:
给定输入-1
输入-2:
启用交叉联接
在spark.sql中使用posexplode、collect_list和array_sort
iyr7buue3#
另一种解决方案是使用嵌套的transform()高阶函数,该函数在spark 2.4及以后版本中可用
通过移动collect_list以转换输入,可以进一步缩短查询
sg24os4d4#
使用aggregate()和transform()HOF:
输入-1:
输入-2:
使用HOF
您可以使用struct()而不是map(),从而将aggregate()替换为map_from_entries()