有一个CSV文件的数据湖,全天更新。我试图创建一个Spark Structured Streaming作业,使用Trigger.Once
功能outlined in this blog post定期将已写入到Parquet数据湖中的CSV数据湖的新数据写入。
这是我的资料
val df = spark
.readStream
.schema(s)
.csv("s3a://csv-data-lake-files")
以下命令将所有数据写入Parquet湖,但在写入所有数据后没有停止(我不得不手动取消该作业)。
processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
以下作业也可以工作,但在写入所有数据后也没有停止(我不得不手动取消该作业):
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.awaitTermination()
以下命令在写入任何数据之前停止了查询。
val query = processedDf
.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", "s3-path-to-checkpoint")
.start("s3-path-to-parquet-lake")
query.stop()
如何配置writeStream
查询,使其等到所有增量数据都写入Parquet文件后再停止?
3条答案
按热度按时间n3h0vuf21#
我得到了结构化流+触发器。一次在 parquet 数据湖上正常工作。
我不认为它是与CSV数据湖工作,因为CSV数据湖有一吨的小文件在嵌套的目录。Spark不喜欢与小CSV文件工作(我认为它需要打开他们所有读取头),真的很讨厌当它需要glob S3目录。
所以我认为Spark结构化流+触发器。一旦代码是好的-他们只需要让CSV阅读器技术更好。
3bygqnnd2#
结构化流的主要目的是连续处理数据,而不需要在新数据到达时启动/停止流。请阅读本文了解更多详细信息。
从Spark 2.0.0开始,
StreamingQuery
有一个方法processAllAvailable
,它等待所有的源数据被处理并提交给sink。请注意scala docs声明这个方法只用于测试目的。因此,代码应该如下所示(如果您仍然需要它):
flseospp3#
解决方案必须包括一个外部触发器,如AWS事件或其他运行作业的东西。一旦运行了,它将拾取新的内容,查看检查点。您也可以使用像气流这样的东西来按计划运行它。Databricks有一个作业调度器。因此您有两个选择
1.使用诸如气流或数据块作业调度程序的工具,按计划运行,例如每小时或每天一次。
1.使用类似于AWS s3 write event的事件来触发作业。
1)的缺点是你可能会免费旋转集群,并支付$$。
2)的缺点是它比较复杂。你可以有一个类似队列的结构来确保那些消息不会丢失。优点是数据库有自动加载器,它可以为你做一些源的所有这些工作。自动加载器可以是连续流或运行一次的风格。