org.elasticsearch.action.ActionListener.wrap()方法的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(12.9k)|赞(0)|评价(0)|浏览(132)

本文整理了Java中org.elasticsearch.action.ActionListener.wrap()方法的一些代码示例,展示了ActionListener.wrap()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ActionListener.wrap()方法的具体详情如下:
包路径:org.elasticsearch.action.ActionListener
类名称:ActionListener
方法名:wrap

ActionListener.wrap介绍

[英]Creates a listener that listens for a response (or failure) and executes the corresponding runnable when the response (or failure) is received.
[中]创建侦听响应(或故障)的侦听器,并在收到响应(或故障)时执行相应的runnable。

代码示例

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * Creates a listener that listens for a response (or failure) and executes the
 * corresponding runnable when the response (or failure) is received.
 *
 * @param runnable the runnable that will be called in event of success or failure
 * @param <Response> the type of the response
 * @return a listener that listens for responses and invokes the runnable when received
 */
static <Response> ActionListener<Response> wrap(Runnable runnable) {
  return wrap(r -> runnable.run(), e -> runnable.run());
}

代码示例来源:origin: org.elasticsearch/elasticsearch

void updateRemoteCluster(
    final String clusterAlias,
    final List<String> addresses,
    final String proxyAddress,
    final ActionListener<Void> connectionListener) {
  final List<Tuple<String, Supplier<DiscoveryNode>>> nodes =
      addresses.stream().<Tuple<String, Supplier<DiscoveryNode>>>map(address -> Tuple.tuple(address, () ->
          buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress)))
      ).collect(Collectors.toList());
  updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener);
}

代码示例来源:origin: org.elasticsearch/elasticsearch

protected void serverAcceptedChannel(TcpChannel channel) {
  boolean addedOnThisCall = acceptedChannels.add(channel);
  assert addedOnThisCall : "Channel should only be added to accepted channel set once";
  // Mark the channel init time
  channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
  channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
  logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel));
}

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
 */
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
               ActionListener<ClusterSearchShardsResponse> listener) {
  final ActionListener<ClusterSearchShardsResponse> searchShardsListener;
  final Consumer<Exception> onConnectFailure;
  if (skipUnavailable) {
    onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY);
    searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY));
  } else {
    onConnectFailure = listener::onFailure;
    searchShardsListener = listener;
  }
  // in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on
  // the skip_unavailable setting
  ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure));
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
  protected void doExecute(RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
    remoteClusterService.getRemoteConnectionInfos(ActionListener.wrap(remoteConnectionInfos
        -> listener.onResponse(new RemoteInfoResponse(remoteConnectionInfos)), listener::onFailure));
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * Notifies the master node to create new persistent task and to assign it to a node.
 */
public <Params extends PersistentTaskParams> void sendStartRequest(final String taskId,
                                  final String taskName,
                                  final Params taskParams,
                                  final ActionListener<PersistentTask<Params>> listener) {
  @SuppressWarnings("unchecked")
  final ActionListener<PersistentTask<?>> wrappedListener =
    ActionListener.wrap(t -> listener.onResponse((PersistentTask<Params>) t), listener::onFailure);
  StartPersistentTaskAction.Request request = new StartPersistentTaskAction.Request(taskId, taskName, taskParams);
  execute(request, StartPersistentTaskAction.INSTANCE, wrappedListener);
}

代码示例来源:origin: org.elasticsearch/elasticsearch

public final void run() {
  final ScrollIdForNode[] context = scrollId.getContext();
  if (context.length == 0) {
    listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
  } else {
    collectNodesAndRun(Arrays.asList(context), nodes, searchTransportService, ActionListener.wrap(lookup -> run(lookup, context),
      listener::onFailure));
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
  if (itemResponses.isEmpty()) {
    return ActionListener.wrap(
        response -> actionListener.onResponse(new BulkResponse(response.getItems(),
            response.getTook().getMillis(), ingestTookInMillis)),
        actionListener::onFailure);
  } else {
    return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * Executes an asynchronous persistent task action using the client.
 * <p>
 * The origin is set in the context and the listener is wrapped to ensure the proper context is restored
 */
private <Req extends ActionRequest, Resp extends PersistentTaskResponse, Builder extends ActionRequestBuilder<Req, Resp, Builder>>
  void execute(final Req request, final Action<Req, Resp, Builder> action, final ActionListener<PersistentTask<?>> listener) {
    try {
      client.execute(action, request,
          ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure));
    } catch (Exception e) {
      listener.onFailure(e);
    }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

void registerNodeConnection(List<TcpChannel> nodeChannels, ConnectionProfile connectionProfile) {
  TimeValue pingInterval = connectionProfile.getPingInterval();
  if (pingInterval.millis() < 0) {
    return;
  }
  final ScheduledPing scheduledPing = pingIntervals.computeIfAbsent(pingInterval, ScheduledPing::new);
  scheduledPing.ensureStarted();
  for (TcpChannel channel : nodeChannels) {
    scheduledPing.addChannel(channel);
    channel.addCloseListener(ActionListener.wrap(() -> {
      scheduledPing.removeChannel(channel);
    }));
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

public void openIndex(final OpenIndexClusterStateUpdateRequest request,
           final ActionListener<OpenIndexClusterStateUpdateResponse> listener) {
  onlyOpenIndex(request, ActionListener.wrap(response -> {
    if (response.isAcknowledged()) {
      String[] indexNames = Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new);
      activeShardsObserver.waitForActiveShards(indexNames, request.waitForActiveShards(), request.ackTimeout(),
        shardsAcknowledged -> {
          if (shardsAcknowledged == false) {
            logger.debug("[{}] indices opened, but the operation timed out while waiting for " +
              "enough shards to be started.", Arrays.toString(indexNames));
          }
          listener.onResponse(new OpenIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged));
        }, listener::onFailure);
    } else {
      listener.onResponse(new OpenIndexClusterStateUpdateResponse(false, false));
    }
  }, listener::onFailure));
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
  protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) {
    if (supplier != null) {
      return supplier.get() == null ? this : new TermsQueryBuilder(this.fieldName, supplier.get());
    } else if (this.termsLookup != null) {
      SetOnce<List<?>> supplier = new SetOnce<>();
      queryRewriteContext.registerAsyncAction((client, listener) -> {
        fetch(termsLookup, client, ActionListener.wrap(list -> {
          supplier.set(list);
          listener.onResponse(null);
        }, listener::onFailure));

      });
      return new TermsQueryBuilder(this.fieldName, supplier::get);
    }
    return this;
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state,
                final ActionListener<ClusterRerouteResponse> listener) {
  ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
    response -> {
      if (request.dryRun() == false) {
        response.getExplanations().getYesDecisionMessages().forEach(logger::info);
      }
      listener.onResponse(response);
    },
    listener::onFailure
  );
  clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
    allocationService, request, logWrapper));
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
public void onResponse(Version version) {
  NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
  long relativeMillisTime = threadPool.relativeTimeInMillis();
  nodeChannels.channels.forEach(ch -> {
    // Mark the channel init time
    ch.getChannelStats().markAccessed(relativeMillisTime);
    ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
  });
  keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
  listener.onResponse(nodeChannels);
}

代码示例来源:origin: org.elasticsearch/elasticsearch

public void updateGlobalCheckpointForShard(final ShardId shardId) {
  final ThreadContext threadContext = threadPool.getThreadContext();
  try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
    threadContext.markAsSystemContext();
    execute(
        new Request(shardId),
        ActionListener.wrap(r -> {
        }, e -> {
          if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
            logger.info(new ParameterizedMessage("{} global checkpoint sync failed", shardId), e);
          }
        }));
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
  ActionRequestBuilder<Request, Response, RequestBuilder>>
void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
  remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> {
    Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
    service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY,
      new ActionListenerResponseHandler<>(listener, action.getResponseReader()));
  },
  listener::onFailure));
}

代码示例来源:origin: org.elasticsearch/elasticsearch

void cleanScrollIds(List<ScrollIdForNode> parsedScrollIds) {
  SearchScrollAsyncAction.collectNodesAndRun(parsedScrollIds, nodes, searchTransportService, ActionListener.wrap(
    lookup -> {
      for (ScrollIdForNode target : parsedScrollIds) {
        final DiscoveryNode node = lookup.apply(target.getClusterAlias(), target.getNode());
        if (node == null) {
          onFreedContext(false);
        } else {
          try {
            Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), node);
            searchTransportService.sendFreeContext(connection, target.getScrollId(),
              ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), e -> onFailedFreedContext(e, node)));
          } catch (Exception e) {
            onFailedFreedContext(e, node);
          }
        }
      }
    }, listener::onFailure));
}

代码示例来源:origin: org.elasticsearch/elasticsearch

public static <Response extends ReplicationResponse & WriteResponse>
ActionListener<BulkResponse> wrapBulkResponse(ActionListener<Response> listener) {
  return ActionListener.wrap(bulkItemResponses -> {
    assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
    BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
    if (bulkItemResponse.isFailed() == false) {
      final DocWriteResponse response = bulkItemResponse.getResponse();
      listener.onResponse((Response) response);
    } else {
      listener.onFailure(bulkItemResponse.getFailure().getCause());
    }
  }, listener::onFailure);
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
  CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
    (i) -> {
      IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
      return shard == null ? null : shard.getPrimary().getDocs();
    }, sourceIndex, targetIndex);
  createIndexService.createIndex(
    updateRequest,
    ActionListener.wrap(response ->
        listener.onResponse(new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(),
            updateRequest.index())), listener::onFailure
    )
  );
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
protected void masterOperation(final CreateIndexRequest request, final ClusterState state,
                final ActionListener<CreateIndexResponse> listener) {
  String cause = request.cause();
  if (cause.length() == 0) {
    cause = "api";
  }
  final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
  final CreateIndexClusterStateUpdateRequest updateRequest =
    new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.index(), request.updateAllTypes())
      .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
      .settings(request.settings()).mappings(request.mappings())
      .aliases(request.aliases())
      .waitForActiveShards(request.waitForActiveShards());
  createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
    listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)),
    listener::onFailure));
}

相关文章