我有一个过程,给定一个新的输入,从kudu数据库中检索相关信息,然后进行一些计算。
问题在于数据检索,我们有1.201.524.092行,对于任何计算,都需要永远开始处理所需的行,因为读者需要将所有内容都交给spark。
要阅读Kudu的表格,我们需要:
def read(tableName: String): Try[DataFrame] = {
val kuduOptions: Map[String, String] = Map(
"kudu.table" -> tableName,
"kudu.master" -> kuduContext.kuduMaster)
SQLContext.read.options(kuduOptions).format("kudu").load
}
然后:
val newInputs = ??? // Dataframe with the new inputs
val currentInputs = read("inputsTable") // This takes too much time!!!!
val relatedCurrent = currentInputs.join(newInputs.select("commonId", Seq("commonId"), "inner")
doThings(newInputs, relatedCurrent)
例如,我们只想引入一个新的输入。好吧,它必须扫描整个表来找到currentinputs,这会导致81.6gb/1201524092行的无序写入。
我该如何改进?
谢谢,
1条答案
按热度按时间insrf1ej1#
您可以收集新的输入,然后在where子句中使用它。使用这种方法可以很容易地命中oom,但它可以使查询非常快,因为这将有利于 predicate 下推