在使用spark结构化流媒体时,我很难理解检查点是如何工作的。
我有一个spark进程,它生成一些事件,我将这些事件记录在一个配置单元表中。对于这些事件,我在kafka流中接收一个确认事件。
我创造了一个新的Spark过程
将配置单元日志表中的事件读取到Dataframe中
使用spark结构化流将这些事件与确认事件流连接起来
将联接的Dataframe写入hbase表。
我在sparkshell中测试了代码,在伪代码(我使用的是scala)下面,它运行良好。
val tableA = spark.table("tableA")
val startingOffset = "earliest"
val streamOfData = .readStream
.format("kafka")
.option("startingOffsets", startingOffsets)
.option("otherOptions", otherOptions)
val joinTableAWithStreamOfData = streamOfData.join(tableA, Seq("a"), "inner")
joinTableAWithStreamOfData
.writeStream
.foreach(
writeDataToHBaseTable()
).start()
.awaitTermination()
现在,我想将此代码安排为定期运行,例如每15分钟运行一次,我正在努力理解如何在这里使用检查点。
每次运行这段代码时,我都希望只从流中读取上一次运行中尚未读取的事件,并将这些新事件与日志表进行内部连接,以便只向最终的hbase表中写入新数据。
我在hdfs中创建了一个目录来存储检查点文件。我将该位置提供给spark submit命令,我使用该命令调用spark代码。
spark-submit --conf spark.sql.streaming.checkpointLocation=path_to_hdfs_checkpoint_directory
--all_the_other_settings_and_libraries
此时,代码每15分钟运行一次,没有任何错误,但它基本上什么都不做,因为它没有将新事件转储到hbase表中。另外,检查点目录是空的,而我假设一些文件必须写入那里?
readstream函数是否需要调整以便从最新的检查点开始读取?
val streamOfData = .readStream
.format("kafka")
.option("startingOffsets", startingOffsets) ??
.option("otherOptions", otherOptions)
我真的很难理解关于这个的spark文档。
提前谢谢!
1条答案
按热度按时间ldfqzlk81#
触发器
“现在,我希望将此代码安排为定期运行,例如每15分钟运行一次,我正在努力理解如何在这里使用检查点。
如果您希望每15分钟触发一次作业,可以使用触发器。
您不需要专门“使用”检查点,只需提供一个可靠的(例如hdfs)检查点位置,如下所示。
检查点
每次运行这段代码时,我只想从流中读取上一次运行中尚未读取的事件[…]”
在spark结构化流应用程序中从kafka读取数据时,最好直接在您的应用程序中设置检查点位置
StreamingQuery
. spark使用此位置创建检查点文件,用于跟踪应用程序的状态,并记录已从kafka读取的偏移量。重新启动应用程序时,它将检查这些检查点文件,以了解从何处继续读取Kafka,因此它不会跳过或错过任何消息。您不需要手动设置启动偏移量。
请务必记住,只允许对应用程序代码进行特定更改,以便检查点文件可用于安全重新启动。在structured streaming programming guide中可以找到一个很好的概述,该指南介绍了流查询更改后的恢复语义。
总的来说,对于高效的spark结构化流媒体应用程序,我建议采用以下结构: