在spark scala中动态创建Dataframe

soat7uwm  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(520)

我有几列数据来自DataFrame1,在一个循环中(来自不同的行)。我想用所有这些不同的行/列数据创建一个dataframe2。
下面是示例数据,我尝试使用seq:

var DF1 = Seq(
  ("11111111", "0101","6573","X1234",12763),
  ("44444444", "0148","8382","Y5678",-2883),
  ("55555555", "0154","5240","Z9011", 8003))

我想在上面的seq中添加下面的两个动态行,然后使用最后的seq来创建一个dataframe。

("88888888", "1333","7020","DEF34",500)
  ("99999999", "1333","7020","GHI56",500)

最终seq或dataframe应如下所示:

var DF3 = Seq(
      ("11111111", "0101","6573","X1234",12763),
      ("44444444", "0148","8382","Y5678",-2883),
      ("55555555", "0154","5240","Z9011", 8003),
      ("88888888", "1333","7020","DEF34",500),
      ("99999999", "1333","7020","GHI56",500))

尝试下面的代码使用seq,创建case类,尽可能使用它。问题是当一个新行被添加到seq时,它返回一个新行被添加到seq中的新seq。如何获得添加了新行的更新seq?如果不是seq,那么使用arraybuffer是个好主意吗?

case class CreateDFTestCaseClass(ACCOUNT_NO: String, LONG_IND: String, SHORT_IND: String,SECURITY_ID: String, QUANTITY: Integer)
  val sparkSession = SparkSession
    .builder()
    .appName("AllocationOneViewTest")
    .master("local")
    .getOrCreate()
  val sc = sparkSession.sparkContext
  import sparkSession.sqlContext.implicits._
  def main(args: Array[String]): Unit = {
    var acctRulesPosDF = Seq(
      ("11111111", "0101","6573","X1234",12763),
      ("44444444", "0148","8382","Y5678",-2883),
      ("55555555", "0154","5240","Z9011", 8003))
    acctRulesPosDF:+ ("88888888", "1333","7020","DEF34",500)
    acctRulesPosDF:+ ("99999999", "1333","7020","GHI56",500))
    var DF3 = acctRulesPosDF.toDF
    DF3.show()
o2gm4chl

o2gm4chl1#

这不是最优雅的方法,但要使代码尽可能与原始代码相似,只需将结果赋回变量即可。

var acctRulesPosDF = Seq(
      ("11111111", "0101","6573","X1234",12763),
      ("44444444", "0148","8382","Y5678",-2883),
      ("55555555", "0154","5240","Z9011", 8003))
    acctRulesPosDF = acctRulesPosDF:+ ("88888888", "1333","7020","DEF34",500)
    acctRulesPosDF = acctRulesPosDF:+ ("99999999", "1333","7020","GHI56",500)

Spark壳中的快速示例

scala>  var acctRulesPosDF = Seq(
     |       ("11111111", "0101","6573","X1234",12763),
     |       ("44444444", "0148","8382","Y5678",-2883),
     |       ("55555555", "0154","5240","Z9011", 8003))
acctRulesPosDF: Seq[(String, String, String, String, Int)] = List((11111111,0101,6573,X1234,12763), (44444444,0148,8382,Y5678,-2883), (55555555,0154,5240,Z9011,8003))

scala>     acctRulesPosDF = acctRulesPosDF:+ ("88888888", "1333","7020","DEF34",500)
acctRulesPosDF: Seq[(String, String, String, String, Int)] = List((11111111,0101,6573,X1234,12763), (44444444,0148,8382,Y5678,-2883), (55555555,0154,5240,Z9011,8003), (88888888,1333,7020,DEF34,500))

scala>     acctRulesPosDF = acctRulesPosDF:+ ("99999999", "1333","7020","GHI56",500)
acctRulesPosDF: Seq[(String, String, String, String, Int)] = List((11111111,0101,6573,X1234,12763), (44444444,0148,8382,Y5678,-2883), (55555555,0154,5240,Z9011,8003), (88888888,1333,7020,DEF34,500), (99999999,1333,7020,GHI56,500))

scala> var DF3 = acctRulesPosDF.toDF
DF3: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 3 more fields]

scala>     DF3.show()
+--------+----+----+-----+-----+
|      _1|  _2|  _3|   _4|   _5|
+--------+----+----+-----+-----+
|11111111|0101|6573|X1234|12763|
|44444444|0148|8382|Y5678|-2883|
|55555555|0154|5240|Z9011| 8003|
|88888888|1333|7020|DEF34|  500|
|99999999|1333|7020|GHI56|  500|
+--------+----+----+-----+-----+
flvlnr44

flvlnr442#

即使添加了新行,也会得到相同的旧seq,原因是,默认导入的seq是类型 scala.collection.immutable.Seq (不会更改)除非您单独指定导入可变序列 enter code here 使用 scala.collection.mutable.Seq . 所以要么在scala中通过显式设置import来使用mutable seq,要么按照@scouto在另一个答案中的建议执行。

相关问题