在scala/spark应用程序中,我创建了dataframe。我计划在整个程序中多次使用这个Dataframe。所以我决定 .cache()
方法。正如您在循环中看到的那样,我使用不同的值对Dataframe进行了多次过滤。因为某种原因 .count()
方法返回总是相同的结果。实际上,它必须返回两个不同的计数值。同时,我注意到Mesos中的奇怪行为。感觉就像 .cache()
方法未被执行。创建Dataframe后,程序转到代码的这一部分 if (!df.head(1).isEmpty)
而且表演了很长时间。我假设缓存进程将运行很长时间,其他进程将使用此缓存并快速运行。你认为问题出在哪里?
import org.apache.spark.sql.DataFrame
var df: DataFrame = spark
.read
.option("delimiter", "|")
.csv("/path_to_the_files/")
.filter(col("col5").isin("XXX", "YYY", "ZZZ"))
df.cache()
var array1 = Array("111", "222")
var array2 = Array("333")
var storage = Array(array1, array2)
if (!df.head(1).isEmpty) {
for (item <- storage) {
df.filter(
col("col1").isin(item:_*)
)
println("count: " + df.count())
}
}
1条答案
按热度按时间juzqafwq1#
实际上,它必须返回两个不同的计数值。
为什么?你是在同一个地方打电话
df
. 也许你的意思是我假设缓存进程将运行很长时间,其他进程将使用此缓存并快速运行。
是的,但仅当依赖于此Dataframe的第一个操作被执行时,并且
head
这就是行动。所以你应该期待程序转到代码的这一部分
if (!df.head(1).isEmpty)
而且表演了很长时间如果没有缓存,您也将获得相同的时间
df.count()
调用,除非spark检测到它并自行启用缓存。