本文整理了Java中scala.concurrent.Await
类的一些代码示例,展示了Await
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Await
类的具体详情如下:
包路径:scala.concurrent.Await
类名称:Await
暂无
代码示例来源:origin: apache/usergrid
private Status sendMessageToLocalRouters( QakkaMessage message ) {
int maxRetries = 5;
int retries = 0;
while ( retries++ < maxRetries ) {
try {
Timeout t = new Timeout( 1, TimeUnit.SECONDS );
// ask ClientActor and wait (up to timeout) for response
Future<Object> fut = Patterns.ask( actorSystemManager.getClientActor(), message, t );
final QakkaMessage response = (QakkaMessage)Await.result( fut, t.duration() );
if ( response != null && response instanceof QueueAckResponse) {
QueueAckResponse qprm = (QueueAckResponse)response;
return qprm.getStatus();
} else if ( response != null ) {
logger.debug("UNKNOWN RESPONSE sending message, retrying {}", retries );
} else {
logger.trace("TIMEOUT sending message, retrying {}", retries );
}
} catch ( TimeoutException e ) {
logger.trace( "TIMEOUT sending message, retrying " + retries, e );
} catch ( Exception e ) {
logger.debug("ERROR sending message, retrying " + retries, e );
}
}
throw new QakkaRuntimeException(
"Error sending message " + message + "after " + retries );
}
代码示例来源:origin: apache/flink
LOG.debug("keytabPath: {}", keytabPath);
config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
final String amPortRange = config.getString(
YarnConfigOptions.APPLICATION_MASTER_PORT);
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
actorSystem.actorOf(
Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
"YARN_Resource_Master_Process_Reaper");
actorSystem.actorOf(
Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE),
"JobManager_Process_Reaper");
Await.ready(actorSystem.whenTerminated(), Duration.Inf());
} catch (InterruptedException | TimeoutException e) {
LOG.error("Error shutting down actor system", e);
AkkaUtils.getTimeout(config).toMillis(),
TimeUnit.MILLISECONDS,
futureExecutor,
代码示例来源:origin: apache/flink
new JavaTestKit(system) {{
final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
Configuration flinkConfig = new Configuration();
YarnConfiguration yarnConfig = new YarnConfiguration();
SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService(
leader1 = system.actorOf(
Props.create(
TestingUtils.ForwardingActor.class,
));
resourceManager = system.actorOf(
Props.create(
TestingYarnFlinkResourceManager.class,
Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
代码示例来源:origin: apache/flink
zkServer.getConnectString(),
rootFolder.getPath());
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setString(AkkaOptions.ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString());
receiver.setSlotSharingGroup(slotSharingGroup);
final JobGraph graph = new JobGraph("Blocking test job", sender, receiver);
cluster.start();
clientActorSystem = cluster.startJobClientActorSystem(graph.getJobID());
Deadline deadline = timeout.$times(3).fromNow();
Future<Object> future = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(graph.getJobID()), deadline.timeLeft());
Await.ready(future, deadline.timeLeft());
Await.result(jobSubmission.resultPromise.future(), deadline.timeLeft());
代码示例来源:origin: apache/flink
try {
Future<Object> response =
Patterns.ask(
applicationClient.get(),
YarnMessages.getLocalGetYarnMessage(),
new Timeout(akkaDuration));
result = Await.result(response, akkaDuration);
} catch (Exception ioe) {
LOG.warn("Error retrieving the YARN messages locally", ioe);
代码示例来源:origin: wxyyxc1992/Backend-Boilerplates
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("calculator-system");
ActorRef calculatorService =
system.actorOf(Props.create(ArithmeticService.class), "arithmetic-service");
// (3 + 5) / (2 * (1 + 1))
Expression task = new Divide(
new Add(new Const(3), new Const(5)),
new Multiply(
new Const(2),
new Add(new Const(1), new Const(1))
)
);
FiniteDuration duration = Duration.create(1, TimeUnit.SECONDS);
Integer result = Await.result(ask(calculatorService, task, new Timeout(duration)).mapTo(classTag(Integer.class)), duration);
System.out.println("Got result: " + result);
Await.ready(system.terminate(), Duration.Inf());
}
}
代码示例来源:origin: apache/flink
Configuration config = new Configuration();
config.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots);
system.terminate();
Await.ready(system.whenTerminated(), Duration.Inf());
代码示例来源:origin: dataArtisans/yahoo-streaming-benchmark
FiniteDuration askTimeout;
Duration lookupDuration = FiniteDuration.create(lookupTimeoutStr);
Duration duration = FiniteDuration.create(queryTimeoutStr);
askTimeout = queryTimeout.mul(queryAttempts);
ActorSystem actorSystem = ActorSystem.create("AkkaStateQuery", AkkaUtils.getDefaultAkkaConfig("", 0));
ActorRef queryActor = actorSystem.actorOf(
Props.create(
QueryActor.class,
Future<Object> futureResult = Patterns.ask(
queryActor,
new QueryState<>(time, campaigns.get(campaignId++)),
new Timeout(askTimeout));
Object result = Await.result(futureResult, askTimeout);
Future<Object> futureResult = Patterns.ask(
queryActor,
new QueryState<>(timestamp, key),
new Timeout(askTimeout));
Object result = Await.result(futureResult, askTimeout);
代码示例来源:origin: wxyyxc1992/Backend-Boilerplates
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create("strategy", ConfigFactory.load("akka.config"));
ActorRef printActor = system.actorOf(Props.create(PrintActor.class), "PrintActor");
ActorRef workerActor = system.actorOf(Props.create(WorkerActor.class), "WorkerActor");
// 等待 Future 返回
Future<Object> future = Patterns.ask(workerActor, 5, 1000);
int result = (int) Await.result(future, Duration.create(3, TimeUnit.SECONDS));
System.out.println("result:" + result);
// 不等待返回值,直接重定向到其他 actor,有返回值来的时候将会重定向到 printActor
Future<Object> future1 = Patterns.ask(workerActor, 8, 1000);
Patterns.pipe(future1, system.dispatcher()).to(printActor);
workerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
}
代码示例来源:origin: vy/fiber-test
.collect(Collectors.joining(", "));
Config config = ConfigFactory.parseString(configText);
ActorSystem system = ActorSystem.create(AkkaActorRingBenchmark.class.getSimpleName() + "System", config);
Props actorProps = Props.create(InternalActor.class, workerIndex, promise);
String actorName = "AkkaActor-" + workerIndex;
actors[workerIndex] = system.actorOf(
actorProps,
actorName);
Iterable<Integer> sequences = Await.result(
Futures.sequence(futures, system.dispatcher()),
Duration.apply(10, TimeUnit.MINUTES));
代码示例来源:origin: eBay/parallec
+ PcDateUtils.getNowDateTimeStr());
executionManager = ActorConfig.createAndGetActorSystem().actorOf(
Props.create(ExecutionManager.class, task),
"ExecutionManager-" + task.getTaskId());
Future<Object> future = Patterns.ask(executionManager,
new InitialRequestToManager(task), new Timeout(duration));
commandResponseFromManager = (ResponseFromManager) Await.result(
future, duration);
ActorConfig.createAndGetActorSystem().stop(executionManager);
代码示例来源:origin: ajmalbabu/distributed-computing
private static Object blockedResponse(ActorRef actorRef, long delayIntervalMillis) throws Exception {
Timeout callTimeout = Timeout.durationToTimeout(FiniteDuration.create(delayIntervalMillis, TimeUnit.MILLISECONDS));
Future<Object> futureResult = ask(actorRef, GET_RESPONSE, callTimeout);
FiniteDuration duration = FiniteDuration.create(delayIntervalMillis, TimeUnit.MILLISECONDS);
return Await.result(futureResult, duration);
}
}
代码示例来源:origin: aliakh/demo-akka-spring
@Override
public void run(String[] args) throws Exception {
try {
ActorRef workerActor = actorSystem.actorOf(springExtension.props("workerActor"), "worker-actor");
workerActor.tell(new WorkerActor.Request(), null);
workerActor.tell(new WorkerActor.Request(), null);
workerActor.tell(new WorkerActor.Request(), null);
FiniteDuration duration = FiniteDuration.create(1, TimeUnit.SECONDS);
Future<Object> awaitable = Patterns.ask(workerActor, new WorkerActor.Response(), Timeout.durationToTimeout(duration));
logger.info("Response: " + Await.result(awaitable, duration));
} finally {
actorSystem.terminate();
Await.result(actorSystem.whenTerminated(), Duration.Inf());
}
}
}
代码示例来源:origin: com.alibaba.blink/flink-runtime
Await.ready(jobSubmissionFuture, askTimeout);
} catch (InterruptedException e) {
throw new JobExecutionException(
} catch (TimeoutException e) {
try {
Await.result(
Patterns.ask(
jobClientActor,
Timeout.durationToTimeout(askTimeout)),
askTimeout);
try {
answer = Await.result(jobSubmissionFuture, Duration.Zero());
代码示例来源:origin: org.apache.servicecomb.saga/saga-core-akka
@Override
public void terminate() throws Exception {
Await.result(actorSystem.terminate(), Duration.Inf());
}
}
代码示例来源:origin: apache/flink
zkServer.getConnectString(),
rootFolder.getPath());
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
timeout);
int numRegisteredTMs = (Integer) Await.result(registeredTMs, timeout);
代码示例来源:origin: org.opendaylight.controller/sal-distributed-datastore
@SuppressWarnings("unchecked")
@Override
public List<String> getLocalShards() {
try {
return (List<String>) Await.result(
Patterns.ask(shardManager, GetLocalShardIds.INSTANCE, ASK_TIMEOUT_MILLIS), Duration.Inf());
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
代码示例来源:origin: keeps/roda
@Override
public void shutdown() {
LOGGER.info("Going to shutdown JOBS actor system");
Future<Terminated> terminate = jobsSystem.terminate();
terminate.onComplete(new OnComplete<Terminated>() {
@Override
public void onComplete(Throwable failure, Terminated result) {
if (failure != null) {
LOGGER.error("Error while shutting down JOBS actor system", failure);
} else {
LOGGER.info("Done shutting down JOBS actor system");
}
}
}, jobsSystem.dispatcher());
try {
LOGGER.info("Waiting up to 30 seconds for JOBS actor system to shutdown");
Await.result(jobsSystem.whenTerminated(), Duration.create(30, "seconds"));
} catch (TimeoutException e) {
LOGGER.warn("JOBS Actor system shutdown wait timed out, continuing...");
} catch (Exception e) {
LOGGER.error("Error while shutting down JOBS actor system", e);
}
}
代码示例来源:origin: org.apache.flink/flink-runtime_2.10
0);
port = configuration.getInteger(JobManagerOptions.PORT);
TaskManager.class);
Future<Object> registrationFuture = Patterns.ask(
taskManager,
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
timeout.toMillis());
Await.ready(registrationFuture, timeout);
代码示例来源:origin: apache/flink
@Override
public void close() throws Exception {
if (isLoaded()) {
actorSystem.terminate();
Await.ready(actorSystem.whenTerminated(), Duration.Inf());
actorSystem = null;
}
}
内容来源于网络,如有侵权,请联系作者删除!