我对scala还比较陌生,所以任何提示/基本信息都非常受欢迎。我正在尝试从dataframe.foreach中访问和编辑可变Map,但我无法做到这一点。
现在我了解了spark如何在多个执行器上执行,数据如何复制到每个节点上进行计算。所以我在网上搜索得到了collectionaccumulator类,它可以帮助跨节点持久化集合。
我的代码
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.joda.time.DateTime
import org.apache.spark.sql.{DataFrame, Row}
import spark.sparkContext._
import org.apache.spark.{AccumulableParam, SparkConf}
import org.apache.spark.serializer.JavaSerializer
import scala.collection.mutable.{ HashMap => MutableHashMap }
var m = scala.collection.mutable.Map("AL" -> "Alabama")
// this creates a collection(list) of accumulator
var mutableMapAcc= spark.sparkContext.collectionAccumulator[scala.collection.mutable.Map[String,String]]("mutableMap")
mutableMapAcc.add( scala.collection.mutable.Map("defaultKey" -> "defaultValue"))
var _mutableMap = scala.collection.mutable.Map("mmap" -> "mmapvalue")
val df = Seq(
("Andy","a1", 20,new DateTime().toString()),
("Berta","b1", 30,new DateTime().toString()),
("Joe","j1", 40,new DateTime().toString())).toDF("name","sector","age","AsOfDate")
println("===================================before foreach======================================================")
println(mutableMapAcc)
println("=========================================================================================")
df.foreach { row =>
println(mutableMapAcc.value.size)
mutableMapAcc.add(scala.collection.mutable.Map( row(0).toString() -> row(1).toString() ) )
println(mutableMapAcc.value)
}
println("===================================after foreach======================================================")
println(mutableMapAcc)
输出:
每次我把可变mapacc的大小设为零。我想访问我在语句中添加的第一个Map(下面)
mutableMapAcc.add( scala.collection.mutable.Map("defaultKey" -> "defaultValue"))
从foreach循环,然后使其行为类似于字典(就像我们在c#中所做的那样),并添加到字典中,如下所示:
mutableMapAcc.value.get(0) += row(0).toString() -> row(1).toString()
另外,我知道这样的数据结构必须是轻量级的,因为每次对Map进行读写时(在我的例子中)都会广播到所有节点,这对于我的用例来说是可以的。如果spark可以帮我做的话,我只想避免引入其他服务(比如redis/anydb)。
暂无答案!
目前还没有任何答案,快来回答吧!