在foreachpartition中执行mysql查询运行缓慢

xytpbqjk  于 2021-06-21  发布在  Mysql
关注(0)|答案(1)|浏览(345)

我想在spark的foreachparition中执行mysql查询,最终将所有查询结果都放到一个Dataframe中。它看起来像:

var rowAccumulator: RowAccumulator = new RowAccumulator

foreachPartition((p) => {
  val result = MysqlService.getData(query, p)
  rowAccumulator.add(result)
})

然后转换 rowAccumulator 到Dataframe。
但是,它运行缓慢。例如,第一个查询需要130ms,第20个查询可能需要150000ms。我注意到在mysqlservice中,我每次都创建db会话,这可能不正确。有没有更好的办法?
更新:mysqlservice在不同的地方使用,我们希望使代码易于维护。如果它不能很好地执行,我们可以应用一种不同的方法来执行查询,比如使用sparkjdbc。我很好奇是什么原因使这个查询运行缓慢。

h5qlskok

h5qlskok1#

Spark蓄能器不是为处理大量数据而设计的。它们主要用于使用在常量内存中操作的方法(如计数器)收集辅助统计信息。
像这样使用累加器是一种效率较低的 collect (不是那样的 collect 是推荐的)如果你
将行累加器转换为Dataframe。
既然您使用的是mysql数据库,那么首先应该看看spark的jdbc连接器:

spark.read.jdbc(...)

只有当您有非常特殊的需求时,才可以使用自定义代码。如果您直接用 map ```
rdd.foreachPartition((p) => {
MysqlService.getData(query, p)
}).map(x => anyRequiredTransformation(x)).toDF

相关问题