如何用spark提高kudu的阅读能力?

fv2wmkja  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(443)

我有一个过程,给定一个新的输入,从kudu数据库中检索相关信息,然后进行一些计算。
问题在于数据检索,我们有1.201.524.092行,对于任何计算,都需要永远开始处理所需的行,因为读者需要将所有内容都交给spark。
要阅读Kudu的表格,我们需要:

def read(tableName: String): Try[DataFrame] = {
        val kuduOptions: Map[String, String] = Map(
                "kudu.table" -> tableName,
                "kudu.master" -> kuduContext.kuduMaster)

        SQLContext.read.options(kuduOptions).format("kudu").load

}

然后:

val newInputs = ??? // Dataframe with the new inputs
val currentInputs = read("inputsTable") // This takes too much time!!!!

val relatedCurrent = currentInputs.join(newInputs.select("commonId", Seq("commonId"), "inner")

doThings(newInputs, relatedCurrent)

例如,我们只想引入一个新的输入。好吧,它必须扫描整个表来找到currentinputs,这会导致81.6gb/1201524092行的无序写入。
我该如何改进?
谢谢,

insrf1ej

insrf1ej1#

您可以收集新的输入,然后在where子句中使用它。使用这种方法可以很容易地命中oom,但它可以使查询非常快,因为这将有利于 predicate 下推

val collectedIds = newInputs.select("commonId").collect
val filtredCurrentInputs = currentInputs.where($"commonId".isin(collectedIds))

相关问题