本文整理了Java中org.elasticsearch.action.ActionListener.wrap()
方法的一些代码示例,展示了ActionListener.wrap()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ActionListener.wrap()
方法的具体详情如下:
包路径:org.elasticsearch.action.ActionListener
类名称:ActionListener
方法名:wrap
[英]Creates a listener that listens for a response (or failure) and executes the corresponding runnable when the response (or failure) is received.
[中]创建侦听响应(或故障)的侦听器,并在收到响应(或故障)时执行相应的runnable。
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.
*
* @param runnable the runnable that will be called in event of success or failure
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the runnable when received
*/
static <Response> ActionListener<Response> wrap(Runnable runnable) {
return wrap(r -> runnable.run(), e -> runnable.run());
}
代码示例来源:origin: org.elasticsearch/elasticsearch
void updateRemoteCluster(
final String clusterAlias,
final List<String> addresses,
final String proxyAddress,
final ActionListener<Void> connectionListener) {
final List<Tuple<String, Supplier<DiscoveryNode>>> nodes =
addresses.stream().<Tuple<String, Supplier<DiscoveryNode>>>map(address -> Tuple.tuple(address, () ->
buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)))
).collect(Collectors.toList());
updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener);
}
代码示例来源:origin: org.elasticsearch/elasticsearch
protected void serverAcceptedChannel(TcpChannel channel) {
boolean addedOnThisCall = acceptedChannels.add(channel);
assert addedOnThisCall : "Channel should only be added to accepted channel set once";
// Mark the channel init time
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel));
}
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
*/
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
ActionListener<ClusterSearchShardsResponse> listener) {
final ActionListener<ClusterSearchShardsResponse> searchShardsListener;
final Consumer<Exception> onConnectFailure;
if (skipUnavailable) {
onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY);
searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY));
} else {
onConnectFailure = listener::onFailure;
searchShardsListener = listener;
}
// in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on
// the skip_unavailable setting
ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure));
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
protected void doExecute(RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
remoteClusterService.getRemoteConnectionInfos(ActionListener.wrap(remoteConnectionInfos
-> listener.onResponse(new RemoteInfoResponse(remoteConnectionInfos)), listener::onFailure));
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* Notifies the master node to create new persistent task and to assign it to a node.
*/
public <Params extends PersistentTaskParams> void sendStartRequest(final String taskId,
final String taskName,
final Params taskParams,
final ActionListener<PersistentTask<Params>> listener) {
@SuppressWarnings("unchecked")
final ActionListener<PersistentTask<?>> wrappedListener =
ActionListener.wrap(t -> listener.onResponse((PersistentTask<Params>) t), listener::onFailure);
StartPersistentTaskAction.Request request = new StartPersistentTaskAction.Request(taskId, taskName, taskParams);
execute(request, StartPersistentTaskAction.INSTANCE, wrappedListener);
}
代码示例来源:origin: org.elasticsearch/elasticsearch
public final void run() {
final ScrollIdForNode[] context = scrollId.getContext();
if (context.length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
} else {
collectNodesAndRun(Arrays.asList(context), nodes, searchTransportService, ActionListener.wrap(lookup -> run(lookup, context),
listener::onFailure));
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return ActionListener.wrap(
response -> actionListener.onResponse(new BulkResponse(response.getItems(),
response.getTook().getMillis(), ingestTookInMillis)),
actionListener::onFailure);
} else {
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* Executes an asynchronous persistent task action using the client.
* <p>
* The origin is set in the context and the listener is wrapped to ensure the proper context is restored
*/
private <Req extends ActionRequest, Resp extends PersistentTaskResponse, Builder extends ActionRequestBuilder<Req, Resp, Builder>>
void execute(final Req request, final Action<Req, Resp, Builder> action, final ActionListener<PersistentTask<?>> listener) {
try {
client.execute(action, request,
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
void registerNodeConnection(List<TcpChannel> nodeChannels, ConnectionProfile connectionProfile) {
TimeValue pingInterval = connectionProfile.getPingInterval();
if (pingInterval.millis() < 0) {
return;
}
final ScheduledPing scheduledPing = pingIntervals.computeIfAbsent(pingInterval, ScheduledPing::new);
scheduledPing.ensureStarted();
for (TcpChannel channel : nodeChannels) {
scheduledPing.addChannel(channel);
channel.addCloseListener(ActionListener.wrap(() -> {
scheduledPing.removeChannel(channel);
}));
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
public void openIndex(final OpenIndexClusterStateUpdateRequest request,
final ActionListener<OpenIndexClusterStateUpdateResponse> listener) {
onlyOpenIndex(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
String[] indexNames = Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new);
activeShardsObserver.waitForActiveShards(indexNames, request.waitForActiveShards(), request.ackTimeout(),
shardsAcknowledged -> {
if (shardsAcknowledged == false) {
logger.debug("[{}] indices opened, but the operation timed out while waiting for " +
"enough shards to be started.", Arrays.toString(indexNames));
}
listener.onResponse(new OpenIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged));
}, listener::onFailure);
} else {
listener.onResponse(new OpenIndexClusterStateUpdateResponse(false, false));
}
}, listener::onFailure));
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) {
if (supplier != null) {
return supplier.get() == null ? this : new TermsQueryBuilder(this.fieldName, supplier.get());
} else if (this.termsLookup != null) {
SetOnce<List<?>> supplier = new SetOnce<>();
queryRewriteContext.registerAsyncAction((client, listener) -> {
fetch(termsLookup, client, ActionListener.wrap(list -> {
supplier.set(list);
listener.onResponse(null);
}, listener::onFailure));
});
return new TermsQueryBuilder(this.fieldName, supplier::get);
}
return this;
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state,
final ActionListener<ClusterRerouteResponse> listener) {
ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
listener.onResponse(response);
},
listener::onFailure
);
clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
allocationService, request, logWrapper));
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void onResponse(Version version) {
NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
long relativeMillisTime = threadPool.relativeTimeInMillis();
nodeChannels.channels.forEach(ch -> {
// Mark the channel init time
ch.getChannelStats().markAccessed(relativeMillisTime);
ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
});
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
listener.onResponse(nodeChannels);
}
代码示例来源:origin: org.elasticsearch/elasticsearch
public void updateGlobalCheckpointForShard(final ShardId shardId) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
execute(
new Request(shardId),
ActionListener.wrap(r -> {
}, e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info(new ParameterizedMessage("{} global checkpoint sync failed", shardId), e);
}
}));
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>>
void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, action.getResponseReader()));
},
listener::onFailure));
}
代码示例来源:origin: org.elasticsearch/elasticsearch
void cleanScrollIds(List<ScrollIdForNode> parsedScrollIds) {
SearchScrollAsyncAction.collectNodesAndRun(parsedScrollIds, nodes, searchTransportService, ActionListener.wrap(
lookup -> {
for (ScrollIdForNode target : parsedScrollIds) {
final DiscoveryNode node = lookup.apply(target.getClusterAlias(), target.getNode());
if (node == null) {
onFreedContext(false);
} else {
try {
Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), node);
searchTransportService.sendFreeContext(connection, target.getScrollId(),
ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), e -> onFailedFreedContext(e, node)));
} catch (Exception e) {
onFailedFreedContext(e, node);
}
}
}
}, listener::onFailure));
}
代码示例来源:origin: org.elasticsearch/elasticsearch
public static <Response extends ReplicationResponse & WriteResponse>
ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
return ActionListener.wrap(bulkItemResponses -> {
assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
if (bulkItemResponse.isFailed() == false) {
final DocWriteResponse response = bulkItemResponse.getResponse();
listener.onResponse((Response) response);
} else {
listener.onFailure(bulkItemResponse.getFailure().getCause());
}
}, listener::onFailure);
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
(i) -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest,
ActionListener.wrap(response ->
listener.onResponse(new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(),
updateRequest.index())), listener::onFailure
)
);
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
protected void masterOperation(final CreateIndexRequest request, final ClusterState state,
final ActionListener<CreateIndexResponse> listener) {
String cause = request.cause();
if (cause.length() == 0) {
cause = "api";
}
final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.index(), request.updateAllTypes())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());
createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)),
listener::onFailure));
}
内容来源于网络,如有侵权,请联系作者删除!