我有一个场景,其中一定数量的操作(包括groupby)必须应用于许多小文件(每个文件约300mb)。手术看起来像这样。。 df.groupBy(....).agg(....)
现在要在多个文件上处理它,我可以使用通配符“/**/*.csv”,但是,它会创建一个rdd并将其分区以用于操作。但是,从操作上看,这是一个分组操作,并且涉及大量的洗牌,如果文件相互排斥,则洗牌是不必要的。
我所看到的是,我可以在文件上创建独立的rdd并独立地对它们进行操作。
我有一个场景,其中一定数量的操作(包括groupby)必须应用于许多小文件(每个文件约300mb)。手术看起来像这样。。 df.groupBy(....).agg(....)
现在要在多个文件上处理它,我可以使用通配符“/**/*.csv”,但是,它会创建一个rdd并将其分区以用于操作。但是,从操作上看,这是一个分组操作,并且涉及大量的洗牌,如果文件相互排斥,则洗牌是不必要的。
我所看到的是,我可以在文件上创建独立的rdd并独立地对它们进行操作。
3条答案
按热度按时间h43kikqp1#
这是一个想法,而不是一个完整的解决方案,我还没有测试它。
您可以从将数据处理管道提取到函数中开始。
如果你的文件很小,你可以调整
n
参数使用尽可能少的分区来容纳单个文件中的数据并避免乱序。这意味着您正在限制并发性,但我们稍后将继续讨论此问题。接下来,您必须获得输入文件的列表。此步骤取决于数据源,但大多数情况下,它或多或少是简单的:
接下来,您可以使用
pipeline
功能:由于我们将并发限制在单个文件的级别,因此我们希望通过提交多个作业来补偿。让我们添加一个简单的助手,强制求值并用
Future
```import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
s3fp2yjn2#
这样我们就可以并行地编写多个rdd
pxiryf3j3#
如果你有很多文件,而且每个文件都很小(你说300mb以上,我认为spark是小的),你可以尝试使用
SparkContext.wholeTextFiles
它将创建一个rdd,其中每个记录都是一个完整的文件。