scala:从dataframe.foreach访问/编辑Map

lymnna71  于 2021-07-14  发布在  Java
关注(0)|答案(0)|浏览(289)

我对scala还比较陌生,所以任何提示/基本信息都非常受欢迎。我正在尝试从dataframe.foreach中访问和编辑可变Map,但我无法做到这一点。
现在我了解了spark如何在多个执行器上执行,数据如何复制到每个节点上进行计算。所以我在网上搜索得到了collectionaccumulator类,它可以帮助跨节点持久化集合。
我的代码

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.joda.time.DateTime

import org.apache.spark.sql.{DataFrame, Row} 
import spark.sparkContext._

import org.apache.spark.{AccumulableParam, SparkConf}
import org.apache.spark.serializer.JavaSerializer
import scala.collection.mutable.{ HashMap => MutableHashMap }

var m  =  scala.collection.mutable.Map("AL" -> "Alabama")

// this creates a collection(list) of accumulator 
var mutableMapAcc= spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String,String]]("mutableMap") 
mutableMapAcc.add(  scala.collection.mutable.Map("defaultKey" -> "defaultValue"))

var _mutableMap = scala.collection.mutable.Map("mmap" -> "mmapvalue")

val df = Seq(
          ("Andy","a1", 20,new DateTime().toString()),     
          ("Berta","b1", 30,new DateTime().toString()),
          ("Joe","j1", 40,new DateTime().toString())).toDF("name","sector","age","AsOfDate")

println("===================================before foreach======================================================")
println(mutableMapAcc)
println("=========================================================================================")
df.foreach { row =>
 println(mutableMapAcc.value.size)
 mutableMapAcc.add(scala.collection.mutable.Map( row(0).toString() -> row(1).toString() ) )
 println(mutableMapAcc.value) 
}
println("===================================after foreach======================================================")
println(mutableMapAcc)

输出:

每次我把可变mapacc的大小设为零。我想访问我在语句中添加的第一个Map(下面)

mutableMapAcc.add(  scala.collection.mutable.Map("defaultKey" -> "defaultValue"))

从foreach循环,然后使其行为类似于字典(就像我们在c#中所做的那样),并添加到字典中,如下所示:

mutableMapAcc.value.get(0) += row(0).toString() -> row(1).toString()

另外,我知道这样的数据结构必须是轻量级的,因为每次对Map进行读写时(在我的例子中)都会广播到所有节点,这对于我的用例来说是可以的。如果spark可以帮我做的话,我只想避免引入其他服务(比如redis/anydb)。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题