代码示例来源:origin: twitter/distributedlog
protected int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client)
throws Exception {
return 0;
代码示例来源:origin: twitter/distributedlog
protected int runCmd(DistributedLogClient client) throws Exception {
System.out.println("Truncating streams : " + streams);
for (String stream : streams) {
boolean success = Await.result(client.truncate(stream, dlsn));
System.out.println("Truncate " + stream + " to " + dlsn + " : " + success);
return 0;
代码示例来源:origin: twitter/distributedlog
* Wait for the acquire result.
* @return true if acquired successfully, otherwise false.
public boolean waitForAcquireQuietly() {
boolean success = false;
try {
success = Await.result(acquireFuture);
} catch (InterruptedException ie) {
} catch (LockTimeoutException lte) {
logger.debug("Timeout on lock acquiring", lte);
} catch (Exception e) {
logger.error("Caught exception waiting for lock acquired", e);
return success;
代码示例来源:origin: twitter/distributedlog
* Await for the transmit to be complete
* @param timeout
* wait timeout
* @param unit
* wait timeout unit
int awaitTransmitComplete(long timeout, TimeUnit unit)
throws Exception {
return Await.result(transmitComplete,
Duration.fromTimeUnit(timeout, unit));
代码示例来源:origin: twitter/distributedlog
long countToLastRecord(DistributedLogManager dlm) throws Exception {
return Await.result(dlm.getLogRecordCountAsync(startDLSN)).longValue();
代码示例来源:origin: twitter/distributedlog
* Wait for the result of a lock operation.
* @param result result to wait
* @param lockPath path of the lock
* @return the result
* @throws LockingException when encountered exceptions on the result of lock operation
public static <T> T lockResult(Future<T> result, String lockPath) throws LockingException {
try {
return Await.result(result);
} catch (LockingException le) {
throw le;
} catch (Exception e) {
throw new LockingException(lockPath, "Encountered exception on locking ", e);
代码示例来源:origin: twitter/distributedlog
public void unlock() {
Future<BoxedUnit> unlockResult = asyncUnlock();
try {
Await.result(unlockResult, Duration.fromMilliseconds(lockOpTimeout));
} catch (TimeoutException toe) {
// This shouldn't happen unless we lose a watch, and may result in a leaked lock.
LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe });
} catch (Exception e) {
LOG.warn("{} failed to unlock {} : ", new Object[] { lockId, lockPath, e });
代码示例来源:origin: twitter/distributedlog
private void dumpRecords(AsyncLogReader reader) throws Exception {
int numRead = 0;
LogRecord record = Await.result(reader.readNext());
while (record != null) {
// dump the record
if (numRead >= count) {
record = Await.result(reader.readNext());
if (numRead == 0) {
System.out.println("No records.");
} else {
代码示例来源:origin: twitter/distributedlog
public void force(boolean metadata) throws IOException {
long pos = 0;
try {
pos = Await.result(logWriter.flushAndCommit());
} catch (IOException ioe) {
throw ioe;
} catch (Exception ex) {
LOG.error("unexpected exception in AppendOnlyStreamWriter.force ", ex);
throw new UnexpectedException("unexpected exception in AppendOnlyStreamWriter.force", ex);
synchronized (syncPos) {
syncPos[0] = pos;
代码示例来源:origin: twitter/distributedlog
public void markEndOfStream() throws IOException {
try {
} catch (IOException ioe) {
throw ioe;
} catch (Exception ex) {
throw new UnexpectedException("Mark end of stream hit unexpected exception", ex);
代码示例来源:origin: twitter/distributedlog
protected int runCmd(DistributedLogClient client) throws Exception {
RateLimiter rateLimiter = RateLimiter.create(rate);
for (String stream : streams) {
try {
System.out.println("Release ownership of stream " + stream);
} catch (Exception e) {
System.err.println("Failed to release ownership of stream " + stream);
throw e;
return 0;
代码示例来源:origin: twitter/distributedlog
* Await for the result of the future and thrown bk related exceptions.
* @param result future to wait for
* @return the result of future
* @throws BKException when exceptions are thrown by the future. If there is unkown exceptions
* thrown from the future, the exceptions will be wrapped into
* {@link org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}.
public static <T> T bkResult(Future<T> result) throws BKException {
try {
return Await.result(result);
} catch (BKException bke) {
throw bke;
} catch (InterruptedException ie) {
throw BKException.create(BKException.Code.InterruptedException);
} catch (Exception e) {
logger.warn("Encountered unexpected exception on waiting bookkeeper results : ", e);
throw BKException.create(BKException.Code.UnexpectedConditionException);
代码示例来源:origin: twitter/distributedlog
private void doMoveStream(final String streamName) throws Exception {
Await.result(srcClient.release(streamName).flatMap(new Function<Void, Future<Void>>() {
public Future<Void> apply(Void result) {
return targetMonitor.check(streamName).addEventListener(new FutureEventListener<Void>() {
public void onSuccess(Void value) {
logger.info("Moved stream {} from {} to {}.",
new Object[]{streamName, source, target});
public void onFailure(Throwable cause) {
logger.info("Failed to move stream {} from region {} to {} : ",
new Object[]{streamName, source, target, cause});
代码示例来源:origin: twitter/distributedlog
public static <T> void validateFutureFailed(Future<T> future, Class exClass) {
try {
} catch (Exception ex) {
LOG.info("Expected: {} Actual: {}", exClass.getName(), ex.getClass().getName());
assertTrue("exceptions types equal", exClass.isInstance(ex));
代码示例来源:origin: twitter/distributedlog
public void run() {
try {
} catch (ReadCancelledException rce) {
} catch (Throwable t) {
LOG.error("Receive unexpected exception on reading stream {} : ", name, t);
}, "read-thread");
代码示例来源:origin: twitter/distributedlog
private void doMoveStream(final String stream, final Host from, final Host to) throws Exception {
logger.info("Moving stream {} from {} to {}.",
new Object[] { stream, from.address, to.address });
Await.result(from.getClient().release(stream).flatMap(new Function<Void, Future<Void>>() {
public Future<Void> apply(Void result) {
logger.info("Released stream {} from {}.", stream, from.address);
return to.getMonitor().check(stream).addEventListener(new FutureEventListener<Void>() {
public void onSuccess(Void value) {
logger.info("Moved stream {} from {} to {}.",
new Object[] { stream, from.address, to.address });
public void onFailure(Throwable cause) {
logger.info("Failed to move stream {} from {} to {} : ",
new Object[] { stream, from.address, to.address, cause });
代码示例来源:origin: twitter/distributedlog
private void readEntries(AsyncLogReader reader) {
try {
for (int i = 0; i < 300; i++) {
LogRecordWithDLSN record = Await.result(reader.readNext());
} catch (Exception ex) {
failed = true;
} finally {
代码示例来源:origin: twitter/distributedlog
protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception {
ZKAccessControl accessControl;
try {
accessControl = Await.result(ZKAccessControl.read(zkc, zkPath, null));
} catch (KeeperException.NoNodeException nne) {
accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath);
return accessControl;
代码示例来源:origin: twitter/distributedlog
public static <T> T validateFutureSucceededAndGetResult(Future<T> future) throws Exception {
try {
return Await.result(future, Duration.fromSeconds(10));
} catch (Exception ex) {
fail("unexpected exception " + ex.getClass().getName());
throw ex;
代码示例来源:origin: twitter/distributedlog
protected void balanceFromSource(DistributedLogClientBuilder clientBuilder,
ClusterBalancer balancer,
String source,
Optional<RateLimiter> rateLimiter)
throws Exception {
InetSocketAddress sourceAddr = DLSocketAddress.parseSocketAddress(source);
DistributedLogClientBuilder sourceClientBuilder =
Pair<DistributedLogClient, MonitorServiceClient> clientPair =
try {
logger.info("Disable accepting new stream on proxy {}.", source);
balancer.balanceAll(source, rebalanceConcurrency, rateLimiter);
} finally {