我尝试为一些Spark操作做一个"简单的测量时间"(endTime - startTime
),以了解做相同逻辑的不同替代方案。
def measureElapsedTime[T](f: => T): (T, Long) = {
val start = System.nanoTime()
val result: T = f
val end = System.nanoTime()
(result, NANOSECONDS.toMillis(end - start))
}
- 我从循环中得到一个递减的处理时间序列。为什么会这样?**
比方说,我正在测量下面这个简单的运算:
def t1(): (Array[Row], Long) = measureElapsedTime {
val res = spark.read.json(nestedDF)
res.collect()
}
通过以下方式进行测量:
val n = 1000
def avg(f: () => (Array[Row], Long)) = 1.to(n).map(_ => f()._2).sum / n
println(avg(t1))
平均会给我大约45毫秒。
- 但是,如果我看一下序列的开头,我可以清楚地看到处理时间减少了。**例如,按执行顺序测量的时间(ms),对于第一个执行:
707
157
144
119
153
108
99
105
121
107
132
89
96
100
83
93
87
94
73
- 为什么会出现这种模式?是因为JVM预热时间吗?还是因为某些spark优化?**
我不认为原因是JVM预热时间,因为不重用"相同" DF的操作不会发生这种情况(无论如何,我在上一个示例之前添加了一些其他操作来预热JVM)。
def t2(): (Array[Row], Long) = measureElapsedTime {
val res: Dataset[String] = nestedDF.map((str: String) => {
val json: Value = ujson.read(str)
ujson.write(json)
})
spark.read.json(res).collect()
}
其中运行时间序列为:
44
141
93
92
79
78
79
84
76
80
78
77
71
71
70
71
103
74
69
72
问题背景:* * 我正在测量一个spark结构化流应用的操作,所以我非常肯定,对于每个微批处理,我都将获得第一个测量值,因为每个微批处理都是一个新的DF。然而,我可能会错,这就是我问这个问题的原因。**
谢谢大家。
说明:
上面示例中的nestedDF
是一个具有JSON字符串值的Dataset[String]
。
+-----------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------+
|{"simple": "a", "nested": {"c1": "a"}} |
|{"simple": "a", "more-nested": {"c1": {"c11": {"c111": "a"}}}} |
|{"simple": "a", "nested-with-diff-types": {"array": ["a", "b"], "obj": {"c1": {"c11": "a"}}}}|
|{"simple": "a", "nested-with-array": {"c1": {"c11": ["a", "b", "c"]}}} |
|{"simple": "a", "nested-with-array-with-obj-elem": {"c1": {"c11": [{"a": "a"}, {"b": "b"}]}}}|
+-----------------------------------------------------------------------------------------------+
1条答案
按热度按时间yyhrrdl81#
您的代码实际上是在一个循环中运行以下Spark代码(其周围的代码只是Scala):
Spark优化?
为了了解Spark是否正在进行优化,让我们使用
explain
方法,它显示了Spark将使用的物理查询执行计划,这是在Spark v3.3.1 spark-shell中完成的:结论
正如您所看到的,物理计划在Spark级别上是完全相同的,因此在那里没有进行优化。
JVM优化?
ApacheSpark将脚本转换为Java字节码的方式超出了我所知道的范围,但有一种方法可以利用我们对JVM的知识来了解是否正在进行一些优化。
如果我们讨论的是执行速度越来越快的代码,我们可以更具体地看看代码缓存,这是一个存储JIT编译器在编译Java字节码时创建的代码的区域。
使用
jconsole
工具(Java 8 JDK的一部分),我在运行代码时监视了代码缓存,并对measureElapsedTime
做了如下修改,以获取运行的绝对时间戳:并创建了一个
timeTaken
方法而不是您的avg
方法来保存每次运行的值绘制代码高速缓存利用率和计算运行所用的时间,如下图所示:
几点意见:
结论
正如你已经注意到的,代码明显地加速了。这种加速与代码缓存的额外使用密切相关。这也是有道理的,因为JIT编译器做了越来越多的工作来编译重复的代码,每次运行都会执行得更快。