scala 结合Trigger.Once使用Spark结构化流

57hvy0tb  于 2022-12-13  发布在  Scala
关注(0)|答案(3)|浏览(154)

有一个CSV文件的数据湖,全天更新。我试图创建一个Spark Structured Streaming作业,使用Trigger.Once功能outlined in this blog post定期将已写入到Parquet数据湖中的CSV数据湖的新数据写入。
这是我的资料

val df = spark
  .readStream
  .schema(s)
  .csv("s3a://csv-data-lake-files")

以下命令将所有数据写入Parquet湖,但在写入所有数据后没有停止(我不得不手动取消该作业)。

processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

以下作业也可以工作,但在写入所有数据后也没有停止(我不得不手动取消该作业):

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.awaitTermination()

以下命令在写入任何数据之前停止了查询。

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.stop()

如何配置writeStream查询,使其等到所有增量数据都写入Parquet文件后再停止?

n3h0vuf2

n3h0vuf21#

我得到了结构化流+触发器。一次在 parquet 数据湖上正常工作。
我不认为它是与CSV数据湖工作,因为CSV数据湖有一吨的小文件在嵌套的目录。Spark不喜欢与小CSV文件工作(我认为它需要打开他们所有读取头),真的很讨厌当它需要glob S3目录。
所以我认为Spark结构化流+触发器。一旦代码是好的-他们只需要让CSV阅读器技术更好。

3bygqnnd

3bygqnnd2#

结构化流的主要目的是连续处理数据,而不需要在新数据到达时启动/停止流。请阅读本文了解更多详细信息。
从Spark 2.0.0开始,StreamingQuery有一个方法processAllAvailable,它等待所有的源数据被处理并提交给sink。请注意scala docs声明这个方法只用于测试目的。
因此,代码应该如下所示(如果您仍然需要它):

query.processAllAvailable
query.stop
flseospp

flseospp3#

解决方案必须包括一个外部触发器,如AWS事件或其他运行作业的东西。一旦运行了,它将拾取新的内容,查看检查点。您也可以使用像气流这样的东西来按计划运行它。Databricks有一个作业调度器。因此您有两个选择
1.使用诸如气流或数据块作业调度程序的工具,按计划运行,例如每小时或每天一次。
1.使用类似于AWS s3 write event的事件来触发作业。
1)的缺点是你可能会免费旋转集群,并支付$$。
2)的缺点是它比较复杂。你可以有一个类似队列的结构来确保那些消息不会丢失。优点是数据库有自动加载器,它可以为你做一些源的所有这些工作。自动加载器可以是连续流或运行一次的风格。

相关问题