我已经编写了一个自定义重试策略类,在该类中我可以传递驱动程序将执行的onwritetimeout/onunavailable/onreadtimeout的重试次数。
public class CustomRetryPolicy implements RetryPolicy {
private static final Logger LOG = LoggerFactory.getLogger(CustomRetryPolicy.class);
@VisibleForTesting
public static final String RETRYING_ON_READ_TIMEOUT =
"[{}] Retrying on read timeout on same host (consistency: {}, required responses: {}, "
+ "received responses: {}, data retrieved: {}, retries: {})";
@VisibleForTesting
public static final String RETRYING_ON_WRITE_TIMEOUT =
"[{}] Retrying on write timeout on same host (consistency: {}, write type: {}, "
+ "required acknowledgments: {}, received acknowledgments: {}, retries: {})";
@VisibleForTesting
public static final String RETRYING_ON_UNAVAILABLE =
"[{}] Retrying on unavailable exception on next host (consistency: {}, "
+ "required replica: {}, alive replica: {}, retries: {})";
@VisibleForTesting
public static final String RETRYING_ON_ABORTED =
"[{}] Retrying on aborted request on next host (retries: {})";
@VisibleForTesting
public static final String RETRYING_ON_ERROR =
"[{}] Retrying on node error on next host (retries: {})";
private static final String LOG_PREFIX = "DATASTORE-CASSANDRA";
private final int readAttempts;
private final int writeAttempts;
private final int unavailableAttempts;
public CustomRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
this.readAttempts = readAttempts;
this.writeAttempts = writeAttempts;
this.unavailableAttempts = unavailableAttempts;
}
@Override
public RetryDecision onReadTimeout(Request request, ConsistencyLevel cl, int blockFor,
int received, boolean dataPresent, int retryCount) {
RetryDecision decision = (retryCount < readAttempts && received >= blockFor && !dataPresent)
? RetryDecision.RETRY_SAME
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
LOG.trace(RETRYING_ON_READ_TIMEOUT, LOG_PREFIX, cl, blockFor, received, false, retryCount);
}
return decision;
}
@Override
public RetryDecision onWriteTimeout(Request request, ConsistencyLevel cl, WriteType writeType,
int blockFor, int received, int retryCount) {
RetryDecision decision = (retryCount < writeAttempts && writeType == DefaultWriteType.BATCH_LOG)
? RetryDecision.RETRY_SAME
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_SAME && LOG.isTraceEnabled()) {
LOG.trace(RETRYING_ON_WRITE_TIMEOUT, LOG_PREFIX, cl, writeType, blockFor, received,
retryCount);
}
return decision;
}
@Override
public RetryDecision onUnavailable(Request request, ConsistencyLevel cl, int required, int alive,
int retryCount) {
RetryDecision decision =
(retryCount < unavailableAttempts) ? RetryDecision.RETRY_NEXT : RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
LOG.trace(RETRYING_ON_UNAVAILABLE, LOG_PREFIX, cl, required, alive, retryCount);
}
return decision;
}
@Override
public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount) {
RetryDecision decision =
(error instanceof ClosedConnectionException || error instanceof HeartbeatException)
? RetryDecision.RETRY_NEXT
: RetryDecision.RETHROW;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
LOG.trace(RETRYING_ON_ABORTED, LOG_PREFIX, retryCount, error);
}
return decision;
}
@Override
public RetryDecision onErrorResponse(Request request, CoordinatorException error,
int retryCount) {
RetryDecision decision =
(error instanceof ReadFailureException || error instanceof WriteFailureException)
? RetryDecision.RETHROW
: RetryDecision.RETRY_NEXT;
if (decision == RetryDecision.RETRY_NEXT && LOG.isTraceEnabled()) {
LOG.trace(RETRYING_ON_ERROR, LOG_PREFIX, retryCount, error);
}
return decision;
}
@Override
public void close() {
// Nothing to do
}
}
我使用的是datastax java驱动程序4.6.0。但问题是我不能用cqlsessionbuilder传递这个类的对象,这可以通过like实现
RetryPolicy rc = new CustomRetryPolicy(3, 3, 2);
Cluster cluster = Cluster.builder().addContactPoint("192.168.0.0").withRetryPolicy(rc).build();
在旧版本的驱动程序中。我试过使用driverconfigloader,但只有传递自定义类名的选项。
你能建议一下吗。
1条答案
按热度按时间8fsztsew1#
如果您查看defaultretrypolicy的实现和customretrypolicy的示例,您将看到两者都接收到2个参数:
context
类型DriverContext
,以及具有配置文件名称的字符串。然后你就可以使用context
得到DriverConfig
通过getConfig
打电话,然后使用getProfile
在config上获取自定义策略所需的配置值-您可以将自己的配置值放入配置文件中,并在重试策略中使用它,如下所示: