我想在spark的foreachparition中执行mysql查询,最终将所有查询结果都放到一个Dataframe中。它看起来像:
var rowAccumulator: RowAccumulator = new RowAccumulator
foreachPartition((p) => {
val result = MysqlService.getData(query, p)
rowAccumulator.add(result)
})
然后转换 rowAccumulator
到Dataframe。
但是,它运行缓慢。例如,第一个查询需要130ms,第20个查询可能需要150000ms。我注意到在mysqlservice中,我每次都创建db会话,这可能不正确。有没有更好的办法?
更新:mysqlservice在不同的地方使用,我们希望使代码易于维护。如果它不能很好地执行,我们可以应用一种不同的方法来执行查询,比如使用sparkjdbc。我很好奇是什么原因使这个查询运行缓慢。
1条答案
按热度按时间h5qlskok1#
Spark蓄能器不是为处理大量数据而设计的。它们主要用于使用在常量内存中操作的方法(如计数器)收集辅助统计信息。
像这样使用累加器是一种效率较低的
collect
(不是那样的collect
是推荐的)如果你将行累加器转换为Dataframe。
既然您使用的是mysql数据库,那么首先应该看看spark的jdbc连接器:
只有当您有非常特殊的需求时,才可以使用自定义代码。如果您直接用
map
```rdd.foreachPartition((p) => {
MysqlService.getData(query, p)
}).map(x => anyRequiredTransformation(x)).toDF