datastax spark cassandra连接器,带有retrypolicy,用于将df写入cassandra表

iyfjxgzm  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(396)

我正在尝试用一致性级别“each\ u quorum”向cassandra写入一个sparkDataframe。我的代码如下所示:

val sparkBuilder = SparkSession.builder().
  config(cassandraHostPropertyProperty, cassandraHosts).
  config(cassandraAuthUsernameProperty, CASSANDRA_AUTH_USER_KEY).
  config(cassandraAuthPassProperty, CASSANDRA_AUTH_PASS_KEY).
  config(cassandraIsSSLEnabledProperty, isSSLEnabled)...
  getOrCreate();

下面是编写df的代码:

df.write.cassandraFormat(tableName, keySpaceName)
    .mode(SaveMode.Append)
    .options(Map(
      WriteConf.ParallelismLevelParam.name -> parallelism_Level.toString,
      WriteConf.BatchSizeRowsParam.name -> rowsInBatch.toString
    ))
    .save()

我想添加一个重试策略,以便如果其中一个数据中心处于关闭状态,write会将一致性降级到本地仲裁。
我知道datastax有一个类multipleretrypolicy.scala,我应该扩展它,重写方法来添加自定义逻辑,并在cassandra conf中使用它的示例。
如何将此策略应用于sparksession或save操作?在scala中,有没有其他方法可以使用retrypolicy或不使用retrypolicy来实现我的需求?

dgenwo3n

dgenwo3n1#

你不想 MultipleRetryPolicy ,您正在执行不属于spark驱动程序的downgradingconsistencyretrypolicy,因此将此作为驱动程序设置的一部分是不可行的,除非您将策略移植到scala。
您可以做的是将查询执行 Package 成一个try-and-catch UnavailableException 然后只需通过更改output.consistency.level参数以较低的一致性重试。

相关问题