cassandra SparkStream当底层方法抛出异常时

whhtz7ly  于 2023-03-29  发布在  Cassandra
关注(0)|答案(1)|浏览(188)

我有一个spark readStream函数,它可以连续地从Kafka读取数据。我对数据执行了一些操作,并希望使用Spark writeStream将其批量写入Cassandra DB。在写入Cassandra时,它可能会抛出任何类型的异常(ConnectionTimeOut等)。我可以做些什么来确保数据不会丢失,以及我可以做些什么来对特定的一批数据执行重试。
这是我的writeStream函数,它在内部调用保存_to_db方法,我们在该方法中执行对表的写入。

query = df.writeStream \
    .outputMode("append") \
    .option("checkpointLocation", "path") \
    .option("failOnDataLoss", "false") \  
    .option("maxAttempts", "5") \            #chatGpt has provided this no reference on web
    .option("retryOnDataLoss", "true") \     #chatGpt has provided this no reference on web   
    .option("failedWriteFile", "path") \     #chatGpt has provided this no reference on web
    .trigger(processingTime="5 seconds") \
    .foreachBatch(save_to_db) \
    .start()

这就是保存_to_db方法。

`def save_to_db(batch_df, batch_id):
    try:
        # Write the batch DataFrame to Cassandra
        (batch_df.write
         .format("org.apache.spark.sql.cassandra")
         .options(table=tableName, keyspace=keyspaceName)
         .mode("append")
         .save())
        return None
    except Exception as e:
        raise e`

就我所知,当保存_to_db方法抛出异常时,spark函数会再次重试该批处理,直到重试次数耗尽。即使仍然失败,它也会写入指定的路径并继续下一批处理。
chatGpt提供的方法是否仍然有效。我没有在官方sparkDocs或spark-cassandra-connector lib中找到任何参考。或者有任何其他替代方案。
https://github.com/datastax/spark-cassandra-connector
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch

x6h2sr28

x6h2sr281#

配置:

.option("retryOnDataLoss", "true")
   .option("failedWriteFile", "path")

可以毫无问题地删除,当异常发生时,spark已经处理失败任务的重试。

相关问题