Apache Spark 如何在流作业的运行之间共享状态?

qlvxas9a  于 2023-01-21  发布在  Apache
关注(0)|答案(1)|浏览(160)

我有一个Spark流作业触发每天使用触发器。一次方法,由于业务需求。

StreamingQuery query = processed
                       .writeStream()
                       .outputMode("append")
                       .format("parquet")
                       .option("path", resultPath)
                       .option("checkpointLocation", checkpointLocationPathForDate)
                       .trigger(Trigger.Once())
                       .start();

我使用map flatMapGroupsWithState来存储分组数据的状态(GroupState)。我在某个地方读到checkpointLocation对于每个StreamingQuery应该是不同的。因此我使用了如下的checkpointLocation:/path/to/nfs/checkpoint/<current date in format: yyyyMMdd>
Spark job每天都会处理文件夹/path/to/data/<current date in format: yyyyMMdd>中的文件
我想访问昨天的Spark作业的状态,因为昨天的数据可能包含今天的数据所需的相关状态。
然而,Spark将状态数据存储在checkpointLocation中,即/path/to/nfs/checkpoint/<current date in format: yyyyMMdd>/<queryName>/state,因此当使用不同的checkpointLocation时,无法访问它。
那么,我怎样才能访问存储在先前Spark作业checkpointLocation中的GroupState数据呢?可以对不同的StreamingQueries使用相同的checkpointLocation吗?
编辑:我尝试对昨天的StreamingQuery和今天的StreamingQuery使用相同的checkpointLocation,并且Spark恢复了昨天的批处理状态,这是我想要的,但是是否在任何地方记录了这一点?这是预期行为还是在每天的批处理之间使用相同的checkpointLocation时可能出现的不当行为?

    • 编辑2:**

数据在S3以Parquet格式存储,路径:s3a://bucket/batchdata/year=2022/month=01/day=19/

    • 2022年1月19日的样本数据:**
s3a://bucket/batchdata/year=2022/month=01/day=19/a.parquet
s3a://bucket/batchdata/year=2022/month=01/day=19/b.parquet
s3a://bucket/batchdata/year=2022/month=01/day=19/c.parquet

使用Spark parquet readStream方法读取数据:

// .parquet(...) is called for 2022-01-19

Dataset<Row> dataset = spark
                    .readStream()
                    .schema(PARQUET_SCHEMA)
                    .parquet("s3a://bucket/batchdata/year=2022/month=01/day=19/");

Dataset<Row> processed = dataset.groupByKey(keyFuncion,encoder)
                                .flatMapGroupsWithState(flatMapStateFunc, 
                                                        OutputMode.Append(),
                                                        stateEncoder,
                                                        outputEncoder,
                                                        GroupStateTimeout.ProcessingTimeTimeout());
                                

StreamingQuery query = processed.writeStream()
                               .outputMode("append")
                               .format("parquet")
                               .option("path", resultPath)
                               .option("checkpointLocation", checkpointLocation)
                               .trigger(Trigger.Once())
                               .start();
query.awaitTermination();

第二天运行相同的作业,第二天的 parquet 文件存储在:s3a://bucket/batchdata/year=2022/month=01/day=20/

    • 2022年1月20日样本数据:**

一个三个三个一个

qacovj5a

qacovj5a1#

如何访问存储在先前Spark作业checkpointLocation中的GroupState数据?
你不应该这样做。从技术上讲,你可以这样做(需要一些额外的编码),但是有太多的东西是特定于另一个查询的(例如,有状态的操作符ID),你应该考虑到。使用风险自担。
是否可以对不同的StreamingQueries使用相同的checkpointLocation?
不可以。您不应该在不同的流查询之间共享相同的checkpointLocation。一个原因是它们的运算符不同,因此数字可能不匹配,即使它们匹配,接收器也可能不同,因此一些数据可能会被跳过(因为已经处理过了)。
我尝试对昨天的StreamingQuery和今天的StreamingQuery使用相同的checkpointLocation,并且Spark恢复了昨天的批处理状态,这是我想要的,但是是否在任何地方记录了这一点?当在每天的批处理之间使用相同的checkpointLocation时,这是预期行为还是可能的不当行为?
这是有文档记录的,这正是checkpointLocation应该如何工作的,它是给定时间流查询状态的目录。
引用使用检查点从故障中恢复:
在失败或有意关闭的情况下,可以恢复上一个查询的上一个进度和状态,并从中断处继续。这是通过检查点和预写日志完成的。可以使用检查点位置、查询将保存所有进度信息(即每个触发器中处理的偏移范围)和运行聚合(例如快速示例中的字数)到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时设置为DataStreamWriter中的选项。

相关问题