我有一个spark readStream函数,它可以连续地从Kafka读取数据。我对数据执行了一些操作,并希望使用Spark writeStream将其批量写入Cassandra DB。在写入Cassandra时,它可能会抛出任何类型的异常(ConnectionTimeOut等)。我可以做些什么来确保数据不会丢失,以及我可以做些什么来对特定的一批数据执行重试。
这是我的writeStream函数,它在内部调用保存_to_db方法,我们在该方法中执行对表的写入。
query = df.writeStream \
.outputMode("append") \
.option("checkpointLocation", "path") \
.option("failOnDataLoss", "false") \
.option("maxAttempts", "5") \ #chatGpt has provided this no reference on web
.option("retryOnDataLoss", "true") \ #chatGpt has provided this no reference on web
.option("failedWriteFile", "path") \ #chatGpt has provided this no reference on web
.trigger(processingTime="5 seconds") \
.foreachBatch(save_to_db) \
.start()
这就是保存_to_db方法。
`def save_to_db(batch_df, batch_id):
try:
# Write the batch DataFrame to Cassandra
(batch_df.write
.format("org.apache.spark.sql.cassandra")
.options(table=tableName, keyspace=keyspaceName)
.mode("append")
.save())
return None
except Exception as e:
raise e`
就我所知,当保存_to_db方法抛出异常时,spark函数会再次重试该批处理,直到重试次数耗尽。即使仍然失败,它也会写入指定的路径并继续下一批处理。
chatGpt提供的方法是否仍然有效。我没有在官方sparkDocs或spark-cassandra-connector lib中找到任何参考。或者有任何其他替代方案。
https://github.com/datastax/spark-cassandra-connector
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch
1条答案
按热度按时间x6h2sr281#
配置:
可以毫无问题地删除,当异常发生时,spark已经处理失败任务的重试。