我正在尝试用一致性级别“each\ u quorum”向cassandra写入一个sparkDataframe。我的代码如下所示:
val sparkBuilder = SparkSession.builder().
config(cassandraHostPropertyProperty, cassandraHosts).
config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
getOrCreate();
下面是编写df的代码:
df.write.cassandraFormat(tableName, keySpaceName)
.mode(SaveMode.Append)
.options(Map(
WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
))
.save()
我想添加一个重试策略,以便如果其中一个数据中心处于关闭状态,write会将一致性降级到本地仲裁。
我知道datastax有一个类multipleretrypolicy.scala,我应该扩展它,重写方法来添加自定义逻辑,并在cassandra conf中使用它的示例。
如何将此策略应用于sparksession或save操作?在scala中,有没有其他方法可以使用retrypolicy或不使用retrypolicy来实现我的需求?
1条答案
按热度按时间dgenwo3n1#
你不想
MultipleRetryPolicy
,您正在执行不属于spark驱动程序的downgradingconsistencyretrypolicy,因此将此作为驱动程序设置的一部分是不可行的,除非您将策略移植到scala。您可以做的是将查询执行 Package 成一个try-and-catch
UnavailableException
然后只需通过更改output.consistency.level参数以较低的一致性重试。