datastax java驱动程序自定义重试策略

8wtpewkr  于 2021-06-09  发布在  Cassandra
关注(0)|答案(1)|浏览(359)

我已经编写了一个自定义重试策略类,在该类中我可以传递驱动程序将执行的onwritetimeout/onunavailable/onreadtimeout的重试次数。

public class CustomRetryPolicy implements RetryPolicy {

  private static final Logger LOG = LoggerFactory.getLogger(CustomRetryPolicy.class);

  @VisibleForTesting
  public static final String RETRYING_ON_READ_TIMEOUT =
      "[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, "
          + "received responses: {}, data retrieved: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_WRITE_TIMEOUT =
      "[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, "
          + "required acknowledgments: {}, received acknowledgments: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_UNAVAILABLE =
      "[{}] Retrying on unavailable exception on next host (consistency: {}, "
          + "required replica: {}, alive replica: {}, retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_ABORTED =
      "[{}] Retrying on aborted request on next host (retries: {})";

  @VisibleForTesting
  public static final String RETRYING_ON_ERROR =
      "[{}] Retrying on node error on next host (retries: {})";

  private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";

  private final int readAttempts;
  private final int writeAttempts;
  private final int unavailableAttempts;

  public CustomRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
    this.readAttempts = readAttempts;
    this.writeAttempts = writeAttempts;
    this.unavailableAttempts = unavailableAttempts;
  }

  @Override
  public RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor,
      int received, boolean dataPresent, int retryCount) {

    RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_READ_TIMEOUT, LOG_PREFIX, cl, blockFor, received, false, retryCount);
    }

    return decision;
  }

  @Override
  public RetryDecision onWriteTimeout(Request request, ConsistencyLevel cl, WriteType writeType,
      int blockFor, int received, int retryCount) {
    RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
        ? RetryDecision.RETRY_SAME
        : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_WRITE_TIMEOUT, LOG_PREFIX, cl, writeType, blockFor, received,
          retryCount);
    }
    return decision;
  }

  @Override
  public RetryDecision onUnavailable(Request request, ConsistencyLevel cl, int required, int alive,
      int retryCount) {
    RetryDecision decision =
        (retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_UNAVAILABLE, LOG_PREFIX, cl, required, alive, retryCount);
    }

    return decision;
  }

  @Override
  public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount) {
    RetryDecision decision =
        (error instanceof ClosedConnectionException || error instanceof HeartbeatException)
            ? RetryDecision.RETRY_NEXT
            : RetryDecision.RETHROW;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_ABORTED, LOG_PREFIX, retryCount, error);
    }

    return decision;
  }

  @Override
  public RetryDecision onErrorResponse(Request request, CoordinatorException error,
      int retryCount) {
    RetryDecision decision =
        (error instanceof ReadFailureException || error instanceof WriteFailureException)
            ? RetryDecision.RETHROW
            : RetryDecision.RETRY_NEXT;

    if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
      LOG.trace(RETRYING_ON_ERROR, LOG_PREFIX, retryCount, error);
    }

    return decision;
  }

  @Override
  public void close() {

    // Nothing to do

  }

}

我使用的是datastax java驱动程序4.6.0。但问题是我不能用cqlsessionbuilder传递这个类的对象,这可以通过like实现

RetryPolicy rc = new CustomRetryPolicy(3, 3, 2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();

在旧版本的驱动程序中。我试过使用driverconfigloader,但只有传递自定义类名的选项。
你能建议一下吗。

8fsztsew

8fsztsew1#

如果您查看defaultretrypolicy的实现和customretrypolicy的示例,您将看到两者都接收到2个参数: context 类型 DriverContext ,以及具有配置文件名称的字符串。然后你就可以使用 context 得到 DriverConfig 通过 getConfig 打电话,然后使用 getProfile 在config上获取自定义策略所需的配置值-您可以将自己的配置值放入配置文件中,并在重试策略中使用它,如下所示:

datastax-java-driver {
  advanced.retry-policy {
    class = DefaultRetryPolicy
  }
  profiles {
    custom-retries {
      advanced.retry-policy {
        class = CustomRetryPolicy
        custom-policy {
           read-attempts = 3
           write-attempts = 2
           ...
        }
      }
    }
  }
}

相关问题