我有一个Spark流作业触发每天使用触发器。一次方法,由于业务需求。
StreamingQuery query = processed
.writeStream()
.outputMode("append")
.format("parquet")
.option("path", resultPath)
.option("checkpointLocation", checkpointLocationPathForDate)
.trigger(Trigger.Once())
.start();
我使用map flatMapGroupsWithState
来存储分组数据的状态(GroupState
)。我在某个地方读到checkpointLocation对于每个StreamingQuery应该是不同的。因此我使用了如下的checkpointLocation:/path/to/nfs/checkpoint/<current date in format: yyyyMMdd>
Spark job每天都会处理文件夹/path/to/data/<current date in format: yyyyMMdd>
中的文件
我想访问昨天的Spark作业的状态,因为昨天的数据可能包含今天的数据所需的相关状态。
然而,Spark将状态数据存储在checkpointLocation中,即/path/to/nfs/checkpoint/<current date in format: yyyyMMdd>/<queryName>/state
,因此当使用不同的checkpointLocation时,无法访问它。
那么,我怎样才能访问存储在先前Spark作业checkpointLocation中的GroupState数据呢?可以对不同的StreamingQueries使用相同的checkpointLocation吗?
编辑:我尝试对昨天的StreamingQuery和今天的StreamingQuery使用相同的checkpointLocation,并且Spark恢复了昨天的批处理状态,这是我想要的,但是是否在任何地方记录了这一点?这是预期行为还是在每天的批处理之间使用相同的checkpointLocation时可能出现的不当行为?
- 编辑2:**
数据在S3以Parquet格式存储,路径:s3a://bucket/batchdata/year=2022/month=01/day=19/
- 2022年1月19日的样本数据:**
s3a://bucket/batchdata/year=2022/month=01/day=19/a.parquet
s3a://bucket/batchdata/year=2022/month=01/day=19/b.parquet
s3a://bucket/batchdata/year=2022/month=01/day=19/c.parquet
使用Spark parquet readStream方法读取数据:
// .parquet(...) is called for 2022-01-19
Dataset<Row> dataset = spark
.readStream()
.schema(PARQUET_SCHEMA)
.parquet("s3a://bucket/batchdata/year=2022/month=01/day=19/");
Dataset<Row> processed = dataset.groupByKey(keyFuncion,encoder)
.flatMapGroupsWithState(flatMapStateFunc,
OutputMode.Append(),
stateEncoder,
outputEncoder,
GroupStateTimeout.ProcessingTimeTimeout());
StreamingQuery query = processed.writeStream()
.outputMode("append")
.format("parquet")
.option("path", resultPath)
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.Once())
.start();
query.awaitTermination();
第二天运行相同的作业,第二天的 parquet 文件存储在:s3a://bucket/batchdata/year=2022/month=01/day=20/
- 2022年1月20日样本数据:**
一个三个三个一个
1条答案
按热度按时间qacovj5a1#
如何访问存储在先前Spark作业checkpointLocation中的GroupState数据?
你不应该这样做。从技术上讲,你可以这样做(需要一些额外的编码),但是有太多的东西是特定于另一个查询的(例如,有状态的操作符ID),你应该考虑到。使用风险自担。
是否可以对不同的StreamingQueries使用相同的checkpointLocation?
不可以。您不应该在不同的流查询之间共享相同的
checkpointLocation
。一个原因是它们的运算符不同,因此数字可能不匹配,即使它们匹配,接收器也可能不同,因此一些数据可能会被跳过(因为已经处理过了)。我尝试对昨天的StreamingQuery和今天的StreamingQuery使用相同的checkpointLocation,并且Spark恢复了昨天的批处理状态,这是我想要的,但是是否在任何地方记录了这一点?当在每天的批处理之间使用相同的checkpointLocation时,这是预期行为还是可能的不当行为?
这是有文档记录的,这正是
checkpointLocation
应该如何工作的,它是给定时间流查询状态的目录。引用使用检查点从故障中恢复:
在失败或有意关闭的情况下,可以恢复上一个查询的上一个进度和状态,并从中断处继续。这是通过检查点和预写日志完成的。可以使用检查点位置、查询将保存所有进度信息(即每个触发器中处理的偏移范围)和运行聚合(例如快速示例中的字数)到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时设置为DataStreamWriter中的选项。