jvm Spark DF处理时间:为什么循环相同的操作会减少处理时间?

w41d8nur  于 2023-01-30  发布在  Spark
关注(0)|答案(1)|浏览(194)

我尝试为一些Spark操作做一个"简单的测量时间"(endTime - startTime),以了解做相同逻辑的不同替代方案。

def measureElapsedTime[T](f: => T): (T, Long) = {

  val start = System.nanoTime()
  val result: T = f
  val end = System.nanoTime()

  (result, NANOSECONDS.toMillis(end - start))
}
    • 我从循环中得到一个递减的处理时间序列。为什么会这样?**

比方说,我正在测量下面这个简单的运算:

def t1(): (Array[Row], Long) = measureElapsedTime {
  val res = spark.read.json(nestedDF)
  res.collect()
}

通过以下方式进行测量:

val n = 1000
def avg(f: () => (Array[Row], Long)) = 1.to(n).map(_ => f()._2).sum / n
println(avg(t1))

平均会给我大约45毫秒。

    • 但是,如果我看一下序列的开头,我可以清楚地看到处理时间减少了。**例如,按执行顺序测量的时间(ms),对于第一个执行:
707
157
144
119
153
108
99
105
121
107
132
89
96
100
83
93
87
94
73
    • 为什么会出现这种模式?是因为JVM预热时间吗?还是因为某些spark优化?**

我不认为原因是JVM预热时间,因为不重用"相同" DF的操作不会发生这种情况(无论如何,我在上一个示例之前添加了一些其他操作来预热JVM)。

def t2(): (Array[Row], Long) = measureElapsedTime {
  val res: Dataset[String] = nestedDF.map((str: String) => {
    val json: Value = ujson.read(str)
    ujson.write(json)
  })
  spark.read.json(res).collect()
}

其中运行时间序列为:

44
141
93
92
79
78
79
84
76
80
78
77
71
71
70
71
103
74
69
72

问题背景:* * 我正在测量一个spark结构化流应用的操作,所以我非常肯定,对于每个微批处理,我都将获得第一个测量值,因为每个微批处理都是一个新的DF。然而,我可能会错,这就是我问这个问题的原因。**
谢谢大家。
说明:
上面示例中的nestedDF是一个具有JSON字符串值的Dataset[String]

+-----------------------------------------------------------------------------------------------+
|value                                                                                          |
+-----------------------------------------------------------------------------------------------+
|{"simple":  "a", "nested":  {"c1":  "a"}}                                                      |
|{"simple":  "a", "more-nested":  {"c1": {"c11": {"c111":  "a"}}}}                              |
|{"simple":  "a", "nested-with-diff-types": {"array": ["a", "b"], "obj": {"c1": {"c11":  "a"}}}}|
|{"simple":  "a", "nested-with-array":  {"c1": {"c11": ["a", "b", "c"]}}}                       |
|{"simple":  "a", "nested-with-array-with-obj-elem":  {"c1": {"c11": [{"a": "a"}, {"b": "b"}]}}}|
+-----------------------------------------------------------------------------------------------+
yyhrrdl8

yyhrrdl81#

您的代码实际上是在一个循环中运行以下Spark代码(其周围的代码只是Scala):

val res = spark.read.json(nestedDF)
res.collect()

Spark优化?

为了了解Spark是否正在进行优化,让我们使用explain方法,它显示了Spark将使用的物理查询执行计划,这是在Spark v3.3.1 spark-shell中完成的:

val nestedDF = Seq(
  """{"simple":  "a", "nested":  {"c1":  "a"}}""",
  """{"simple":  "a", "more-nested":  {"c1": {"c11": {"c111":  "a"}}}}""",
  """{"simple":  "a", "nested-with-diff-types": {"array": ["a", "b"], "obj": {"c1": {"c11":  "a"}}}}""",
  """{"simple":  "a", "nested-with-array":  {"c1": {"c11": ["a", "b", "c"]}}}""",
  """{"simple":  "a", "nested-with-array-with-obj-elem":  {"c1": {"c11": [{"a": "a"}, {"b": "b"}]}}}"""
).toDF.as[String]

val res = spark.read.json(nestedDF)

res.explain
== Physical Plan ==
*(1) Scan ExistingRDD[more-nested#9,nested#10,nested-with-array#11,nested-with-array-with-obj-elem#12,nested-with-diff-types#13,simple#14]

res.collect

val res = spark.read.json(nestedDF)

res.explain
== Physical Plan ==
*(1) Scan ExistingRDD[more-nested#28,nested#29,nested-with-array#30,nested-with-array-with-obj-elem#31,nested-with-diff-types#32,simple#33]

res.collect
结论

正如您所看到的,物理计划在Spark级别上是完全相同的,因此在那里没有进行优化。

JVM优化?

ApacheSpark将脚本转换为Java字节码的方式超出了我所知道的范围,但有一种方法可以利用我们对JVM的知识来了解是否正在进行一些优化。
如果我们讨论的是执行速度越来越快的代码,我们可以更具体地看看代码缓存,这是一个存储JIT编译器在编译Java字节码时创建的代码的区域。
使用jconsole工具(Java 8 JDK的一部分),我在运行代码时监视了代码缓存,并对measureElapsedTime做了如下修改,以获取运行的绝对时间戳:

def measureElapsedTime[T](f: => T): (T, Long, Long) = {
  val now = Instant.now().toEpochMilli()
  val start = System.nanoTime()
  val result: T = f
  val end = System.nanoTime()
  (result, TimeUnit.NANOSECONDS.toMillis(end - start), now)
}

并创建了一个timeTaken方法而不是您的avg方法来保存每次运行的值

def timeTaken(f: () => (Array[Row], Long, Long)) = 1.to(n).map(_ => (f()._2, f()._3))

绘制代码高速缓存利用率和计算运行所用的时间,如下图所示:

几点意见:

  • 我对第一张图表的y轴使用了对数比例,因为在第一部分之后的轻微加速不太明显
  • 代码缓存图在耗时图之前开始,这是因为我在实际运行代码之前启动了spark-shell
结论

正如你已经注意到的,代码明显地加速了。这种加速与代码缓存的额外使用密切相关。这也是有道理的,因为JIT编译器做了越来越多的工作来编译重复的代码,每次运行都会执行得更快。

相关问题