我试图在一个单例scala对象中设置一个到redis的连接池,以便在Mapdf的分区时可以读/写redis。我希望在运行main方法时能够配置主机以及其他连接池变量。但是,当前的配置没有给我配置的redis\u主机,而是给我localhost。
写这篇文章时我提到https://able.bio/patrickcording/sharing-objects-in-spark--58x4gbf 每个执行器节一个示例。
在为每个执行器维护一个redisclient示例的同时,实现配置主机的最佳方法是什么?
object Main {
def main(args: Array[String]): Unit = {
val parsedConfig = ConfigFactory.parseFile(new File(args(0)))
val config = ConfigFactory.load(parsedConfig)
RedisClient.host = config.getString("REDIS_HOST")
val Main = new Main()
Main.runMain()
}
}
class Main{
val df = Seq(...).toDF()
df.mapPartitions(partitions => {
partitions.foreach(row => {
val count = RedisClient.getIdCount(row.getAs("id").asInstanceOf[String])
//do something
})
})
df.write.save
RedisClient.close()
}
object RedisClient {
var host: String = "localhost"
private val pool = new RedisClientPool(host, 6379)
def getIdCount(id: String):Option[String] = {
pool.withClient(client => {
client.get(orderLineId)
})
}
def close(): Unit = {
pool.close()
}
}
1条答案
按热度按时间rjjhvcjd1#
在Spark中,
main
只在驱动程序上运行,不在执行器上运行。RedisClient
在调用调用它的方法之前,不能保证它存在于任何给定的执行器上,并且只会使用默认值初始化它。因此,确保它拥有正确主机的唯一方法是,在相同的rdd/df操作中,确保
host
已设置,例如:当然,因为
main
如果不在驱动程序上运行,则可能还需要将配置广播给执行器:那你就通过了
broadcastConfig
周围和使用broadcastConfig.value
代替config
,则上述内容将变为:只要你注意总是给
RedisClient.host
在做其他事情之前先把它调好RedisClient
,你应该安全。