sparksql:从一个Dataframe中减去另一个Dataframe的相应行

ss2ws0br  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(794)

我有两个具有相同模式的Dataframe

df1
    col1   col2
    23     89
df2
    col1   col2
    57     72

我想从df2行中减去df1。所以我想看看

result
col1   col2
34     -3

如何在spark sql中实现这一点?

cl25kdpy

cl25kdpy1#

此版本对df中不同数量的列执行此操作。只是需要更新一下 val n 到df中的列数。
它回答发问者提出的不属于原始问题的后续问题。

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column

val suffix = "_2"

def renameCols(t: DataFrame, suffix: String = ""): DataFrame = {
  t.select( t.columns.map { c => t.col(c).as( c + suffix) } : _* )
}

// Initial data
val df1 = sc.parallelize(Seq( (11.0, 22.0, 33.0, 44.0, 55.0), (22.0, 33.0, 44.0, 55.0, 66.0))).toDF("c1", "c2", "c3", "c4", "c5")
val df2 = sc.parallelize(Seq( (1.0, 2.0, 3.0, 4.0, 5.0), (2.0, 3.0, 4.0, 5.0, 6.0))).toDF("c1", "c2", "c3", "c4", "c5")
val newSchema = StructType(df1.schema.fields ++ Array(StructField("rowId", LongType, false)))

val rddWithId1 = df1.rdd.zipWithIndex
val rddWithId2 = df2.rdd.zipWithIndex
val X = spark.createDataFrame(rddWithId1.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) 
val Y = spark.createDataFrame(rddWithId2.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) 
val dfA = renameCols(X, "_1")
val dfB = renameCols(Y, "_1_2")

val df = dfA.join(dfB, dfA("rowId_1") === dfB("rowId_1_2")) 
//df.show(false)

val res = df.select(df.columns.filter(_.endsWith("_1")).map(c => col(c) - col(c+s"${suffix}")): _*).drop("(rowId_1 - rowId_1_2)")

// Number of cols generated, could alo be automated
val n = 5
val newColumns = Seq.range(1,n+1).map(c => ("c_" + c))
res.toDF(newColumns:_*).show(false)

退货:

+----+----+----+----+----+
|c_1 |c_2 |c_3 |c_4 |c_5 |
+----+----+----+----+----+
|10.0|20.0|30.0|40.0|50.0|
|20.0|30.0|40.0|50.0|60.0|
+----+----+----+----+----+
x4shl7ld

x4shl7ld2#

另一个更难的方法,但可能是更好的性能明智和严格地说是正确的。显示了df api中常见名称的一些问题。
zipwithindex方法保留了顺序。
重点仍然是,如果成百上千的col呢?

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

def renameCols(t: DataFrame, suffix: String = ""): DataFrame = {
  t.select( t.columns.map { c => t.col(c).as( c + suffix) } : _* )
}

// Initial data
val df1 = sc.parallelize(Seq( (1.0, 2.0), (4.0, 2.0))).toDF("c1", "c2")
val df2 = sc.parallelize(Seq( (1.0, 3.0), (1.0, 2.0))).toDF("c1", "c2")

val newSchema = StructType(df1.schema.fields ++ Array(StructField("rowId", LongType, false)))

val rddWithId1 = df1.rdd.zipWithIndex
val rddWithId2 = df2.rdd.zipWithIndex
val X = spark.createDataFrame(rddWithId1.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) 
val Y = spark.createDataFrame(rddWithId2.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) 
val dfA = renameCols(X, "_1")
val dfB = renameCols(Y, "_2")

val df = dfA.join(dfB, dfA("rowId_1") === dfB("rowId_2")) 
df.show(false)
df.selectExpr("c1_1 - c1_2 as c1", "c2_1 - c2_2 as c2").show(false)

附录
对于许多列,是一个开始,而不是答案的一部分:

df.select(df.columns.filter(_.endsWith("_1")).map(c => col(c) + 1): _*).show

对带lit&SUBTERATION和rowid DROPTION的_1和_2列应用适当的函数。值得深思的好例子。

nvbavucw

nvbavucw3#

下面的代码可能会有所帮助,

import org.apache.spark.sql.expressions.Window

val df1 = Seq((23,89)).toDF("col1","col2")

val df2 = Seq((57,72)).toDF("col1","col2")

val windowSpec  = Window.partitionBy(lit("A")).orderBy(lit("A"))

val df3=df1.withColumn("id",row_number.over(windowSpec))
val df4=df2.withColumn("id",row_number.over(windowSpec))

df3.createOrReplaceTempView("df3")
df4.createOrReplaceTempView("df4")

spark.sql("SELECT a.col1-b.col1 as col1,a.col2-b.col2 as col2 FROM df4 a INNER JOIN df3 b ON a.id=b.id").show()

/*
+----+----+
|col1|col2|
+----+----+
|  34| -17|
+----+----+

* /

相关问题