//work split spark.parallelize(1, 10k).map(item => doTask(i)).collect()
在这里,我需要在dotask(i)的数据库中进行一些日志记录。序列化dbmanager并发送到worker节点是不容易的。spark是否还可以向驱动程序节点指示taski已完成,然后登录驱动程序节点?
5cnsuln71#
我可以想出三个选择:
在map函数内(在方法内的示例中 doTask )你可以示例化 dbManager 并运行日志代码。这个 dbManager 将在executor/worker节点上创建。但是,此选项将导致spark作业的性能非常差,因为将为spark的每个元素创建一个db连接 rdd .
doTask
dbManager
rdd
只使用一个分区 dbManager 每个分区的 rdd 将在executor/worker节点上创建。实际上,在spark任务中创建db连接是mappartitions的典型用例。
val result = spark.sparkContext.parallelize(1 to 10000).mapPartitions(it => { //initalize and use database connection here for( item <- it) yield { doTask(item) } }).collect()
根据rdd的大小和分区的数量(或者找到一个好的分区器的可能性),这个选项将提供良好的性能特性。
第三种选择是使用spark之外的消息传递技术,例如kafka。
1条答案
按热度按时间5cnsuln71#
我可以想出三个选择:
Map
在map函数内(在方法内的示例中
doTask
)你可以示例化dbManager
并运行日志代码。这个dbManager
将在executor/worker节点上创建。但是,此选项将导致spark作业的性能非常差,因为将为spark的每个元素创建一个db连接rdd
.Map分区
只使用一个分区
dbManager
每个分区的rdd
将在executor/worker节点上创建。实际上,在spark任务中创建db连接是mappartitions的典型用例。根据rdd的大小和分区的数量(或者找到一个好的分区器的可能性),这个选项将提供良好的性能特性。
Kafka或类似
第三种选择是使用spark之外的消息传递技术,例如kafka。