python—将sparkDataframe行的值按比例分配给其他行

bybem2ql  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(407)

当我在team列中有公共值时,我必须在参与相同销售(id\u sales)的团队之间按比例共享这个公共值。

+---------------+----------------+----------------+
|id_sales       |team            |price           |
+---------------+----------------+----------------+
|101            |Data Engineering|             200|
|102            |       Front-End|             300|
|103            |  Infrastructure|             100|
|103            |        Software|             200|
|103            |          Commum|             800|
|104            |    Data Science|             500|
+---------------+----------------+----------------+

例如:在上面的表中,我在id\u sales=103中得到了公共值,因此我必须计算每个团队的公共值:-基础设施:100-软件:200,因此基础设施是1/3*(800),软件是2/3*(800),因此在最后,我的表将如下所示:

+---------------+----------------+----------------+
|id_sales       |team            |price           |
+---------------+----------------+----------------+
|101            |Data Engineering|             200|
|102            |       Front-End|             300|
|103            |  Infrastructure|          366,66|
|103            |        Software|          733,66|
|104            |    Data Science|             500|
+---------------+----------------+----------------+

有人能给我一些建议吗?提示可以是python或scala(spark2.4)。
创建此表的代码:
Pypark公司

spark_df = spark.createDataFrame( \
[ \
  ("101", "Data Engineering", "200"),
  ("102", "Front-End", "300"),
  ("103", "Infrastructure", "100"),
  ("103", "Software", "200"),
  ("103", "Commum", "800"),
  ("104", "Data Science", "500") \
],
["id_sales", "team", "price"])

Spark鳞片

val spark_df = Seq(
  ("101", "Data Engineering", "200"),
  ("102", "Front-End", "300"),
  ("103", "Infrastructure", "100"),
  ("103", "Software", "200"),
  ("103", "Commum", "800"),
  ("104", "Data Science", "500")
).toDF("id_sales", "team", "price")

谢谢:)

p5fdfcr1

p5fdfcr11#

试试这个:

scala> val df = Seq(
     |   ("101", "Data Engineering", "200"),
     |   ("102", "Front-End", "300"),
     |   ("103", "Infrastructure", "100"),
     |   ("103", "Software", "200"),
     |   ("103", "Common", "800"),
     |   ("104", "Data Science", "500")
     | ).toDF("id_sales", "team", "price")
df: org.apache.spark.sql.DataFrame = [id_sales: string, team: string ... 1 more field]

scala> df.show
+--------+----------------+-----+
|id_sales|            team|price|
+--------+----------------+-----+
|     101|Data Engineering|  200|
|     102|       Front-End|  300|
|     103|  Infrastructure|  100|
|     103|        Software|  200|
|     103|          Common|  800|
|     104|    Data Science|  500|
+--------+----------------+-----+

scala> val commonDF = df.filter("team='Common'")
commonDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_sales: string, team: string ... 1 more field]

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val ww = Window.partitionBy("id_sales")
ww: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@43324745

scala> val finalDF = df.as("main").filter("team<>'Common'").withColumn("weight",col("price")/sum("price").over(ww)).join(commonDF.as("common"), Seq("id_sales"),"left").withColumn("updated_price",when(col("common.price").isNull,df("price")).otherwise(df("price")+col("weight")*col("common.price"))).select($"id_sales",$"main.team",$"updated_price".as("price"))
finalDF: org.apache.spark.sql.DataFrame = [id_sales: string, team: string ... 1 more field]

scala> finalDF.show
+--------+----------------+------------------+
|id_sales|            team|             price|
+--------+----------------+------------------+
|     101|Data Engineering|               200|
|     104|    Data Science|               500|
|     102|       Front-End|               300|
|     103|        Software| 733.3333333333333|
|     103|  Infrastructure|366.66666666666663|
+--------+----------------+------------------+

相关问题