结构化流与批处理性能差异

mccptt67  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(377)

我们有一项工作,可以在时间窗口内聚合数据。我们是spark的新手,我们观察到与流式作业和批处理作业在逻辑上运行相同查询的性能特征有显著不同。我们希望了解正在发生的事情,并找到可能的方法来提高基于结构化流的方法的速度。
在本文中,假设模式是

root
 |-- objectId: long (nullable = true)
 |-- eventTime: long (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)

哪里 date 以及 hour 是(派生的)分区键,即Parquet文件存储在文件夹中,如 date=2020-07-26/hour=4 .
底层格式类型是delta湖。
一小时的数据大约有2亿个事件 objectId 分布广泛(一小时内观察到1000万个不同值,分布非常不均匀)
我们正在计算每个月的事件数 objectId ,装在5分钟桶里
底层源从kafka队列流式传输到(并且每分钟运行一次)
adl2上每分钟出现两个新文件,每个文件大小为25mb(实际文件包含10个以上未显示的附加列)
我们运行的是结构化流媒体作业,基本上是:

df.read.format("delta")
  .withWatermark("7 minutes") // watermark only applied to streaming query
  .groupBy($"date", $"hour", $"objectId", window($"eventTime", "5 minutes"))
  .coalesce(1) // debatable; we like limited number of files
  .partitionBy("date", "hour")
  .writeStream
  .format("delta")
  .option("checkpointLocation", <...>)
  .partitionBy("date", "hour")
  .start(<destination url>)
  .awaitTermination

除了 withWatermark 以及类似的替代品 writeStream 它从完全相同的源读取,所以它将读取完全相同的文件,具有相同的大小等等。
我们正在运行这些:
azure数据包
azure数据湖2代
观察:
批处理作业能够在大约一分钟内聚合一个小时,在尽可能小的集群上运行(3x f4s)
结构化流式处理作业ooms,即使使用(3x ds3\u v2),因此我们必须配置更大的示例(3x l4s,每个节点32gb)
CPU实际上处于空闲状态(97.4%空闲)
每个微批次需要30-60秒(几乎完全用在 addBatch )
低网络活动(可能为2mb/s)
一般来说,我有一种感觉,当数据量增加时,流媒体工作将无法维持(我们计划10倍的流量)
我的理解是,流式查询在给定水印(7分钟)和窗口大小(5分钟)的情况下只需向后看不到15分钟,直到它可以写出一个5分钟的窗口并放弃所有相关的状态。
问题:
为什么基于结构化流的解决方案需要如此多的内存?
假设我们必须为1000万个条目保持状态,我看不出我们怎么会需要这么多
考虑到流作业处于空闲状态,是什么导致流作业处理时间过长?
我应该看什么样的指标(这里是spark新手)?

tvz2xvvm

tvz2xvvm1#

df.read.format(“增量”)
看起来您正在创建一个静态Dataframe,然后将此静态Dataframe转换为流式Dataframe。聚合应用于静态Dataframe,因此窗口可能无法工作。尝试创建流式Dataframe:

val DF = spark
  .readStream
  .format("delta")...

这里可以找到一些例子https://docs.databricks.com/delta/delta-streaming.html#delta-表-as-a-stream-source

相关问题