我正在尝试读取spark2.4中的.txt文件并将其加载到dataframe。文件数据如下所示:-
一个经理手下有许多雇员
Manager_21: Employee_575,Employee_2703,
Manager_11: Employee_454,Employee_158,
Manager_4: Employee_1545,Employee_1312
我在scala spark 2.4中编写的代码:-
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("D:/path/myfile.txt")
df.printSchema()
不幸的是,在打印模式时,所有员工都可以在单个经理21下看到。
root
|-- Manager_21: servant_575: string (nullable = true)
|-- Employee_454: string (nullable = true)
|-- Employee_1312 string (nullable = true)
....... ...... 等
我不确定是否有可能在斯卡拉星火。。。。
预期产量:
同一列中经理的所有雇员。例如:21号经理有两名员工,他们都在同一列。或者我们怎样才能看到哪个员工都在一个特定的经理手下。
Manager_21 |Manager_11 |Manager_4
Employee_575 |Employee_454 |Employee_1545
Employee_2703|Employee_158|Employee_1312
有没有其他办法。。。。。请建议
谢谢
1条答案
按热度按时间pexxcrt21#
尝试使用
spark.read.text
然后使用groupBy
以及.pivot
以得到期望的结果。Example:
```val df=spark.read.text("")
df.show(10,false)
//+--------------------------------------+
//|value |
//+--------------------------------------+
//|Manager_21: Employee_575,Employee_2703|
//|Manager_11: Employee_454,Employee_158 |
//|Manager_4: Employee_1545,Employee_1312|
//+--------------------------------------+
import org.apache.spark.sql.functions._
df.withColumn("mid",monotonically_increasing_id).
withColumn("col1",split(col("value"),":")(0)).
withColumn("col2",split(split(col("value"),":")(1),",")).
groupBy("mid").
pivot(col("col1")).
agg(min(col("col2"))).
select(max("Manager_11").alias("Manager_11"),max("Manager_21").alias("Manager_21") ,max("Manager_4").alias("Manager_4")).
selectExpr("explode(arrays_zip(Manager_11,Manager_21,Manager_4))").
select("col.*").
show()
//+-------------+-------------+--------------+
//| Manager_11| Manager_21| Manager_4|
//+-------------+-------------+--------------+
//| Employee_454| Employee_575| Employee_1545|
//| Employee_158|Employee_2703| Employee_1312|
//+-------------+-------------+--------------+
`UPDATE:`
val df=spark.read.text("")
val df1=df.withColumn("mid",monotonically_increasing_id).
withColumn("col1",split(col("value"),":")(0)).
withColumn("col2",split(split(col("value"),":")(1),",")).
groupBy("mid").
pivot(col("col1")).
agg(min(col("col2"))).
select(max("Manager_11").alias("Manager_11"),max("Manager_21").alias("Manager_21") ,max("Manager_4").alias("Manager_4")).
selectExpr("explode(arrays_zip(Manager_11,Manager_21,Manager_4))")
//create temp table
df1.createOrReplaceTempView("tmp_table")
sql("select col.* from tmp_table").show(10,false)
//+-------------+-------------+--------------+
//|Manager_11 |Manager_21 |Manager_4 |
//+-------------+-------------+--------------+
//| Employee_454| Employee_575| Employee_1545|
//|Employee_158 |Employee_2703|Employee_1312 |
//+-------------+-------------+--------------+