本文整理了Java中org.elasticsearch.threadpool.ThreadPool
类的一些代码示例,展示了ThreadPool
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ThreadPool
类的具体详情如下:
包路径:org.elasticsearch.threadpool.ThreadPool
类名称:ThreadPool
暂无
代码示例来源:origin: floragunncom/search-guard
String injectedUserString = threadPool.getThreadContext().getTransient(ConfigConstants.SG_INJECTED_USER);
if (log.isDebugEnabled()) {
log.debug("Injected user string: {}", injectedUserString);
log.error("User string malformed, could not extract parts. User string was '{}.' User injection failed.", injectedUserString);
return false;
InetAddress iAdress = InetAddress.getByName(ipAndPort[0]);
int port = Integer.parseInt(ipAndPort[1]);
threadPool.getThreadContext().putTransient(ConfigConstants.SG_REMOTE_ADDRESS, new TransportAddress(iAdress, port));
} catch (UnknownHostException | NumberFormatException e) {
log.error("Cannot parse remote IP or port: {}, user injection failed.", parts[2], e);
threadPool.getThreadContext().putTransient(ConfigConstants.SG_REMOTE_ADDRESS, xffResolver.resolve(request));
threadPool.getThreadContext().putTransient(ConfigConstants.SG_USER, user);
auditLog.logSucceededLogin(parts[0], true, null, request);
if (log.isTraceEnabled()) {
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* The executor service for this transport service.
*
* @return the executor service
*/
private ExecutorService getExecutorService() {
return threadPool.generic();
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void handleResponse(PingResponse response) {
if (!running()) {
return;
}
retryCount = 0;
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this);
}
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* Returns <code>true</code> if the given service was terminated successfully. If the termination timed out,
* the service is <code>null</code> this method will return <code>false</code>.
*/
public static boolean terminate(ExecutorService service, long timeout, TimeUnit timeUnit) {
if (service != null) {
service.shutdown();
if (awaitTermination(service, timeout, timeUnit)) return true;
service.shutdownNow();
return awaitTermination(service, timeout, timeUnit);
}
return false;
}
代码示例来源:origin: org.elasticsearch/elasticsearch
private void notifyPingReceived(final PingRequest pingRequest) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Listener listener : listeners) {
listener.onPingReceived(pingRequest);
}
}
});
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void onMaster() {
this.isMaster = true;
if (logger.isTraceEnabled()) {
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
}
try {
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
// Submit an info update job to be run immediately
threadPool.executor(executorName()).execute(() -> maybeRefresh());
}
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
}
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, shardId));
}
});
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void run() {
if (logger.isTraceEnabled()) {
logger.trace("Submitting new rescheduling cluster info update job");
}
try {
threadPool.executor(executorName()).execute(() -> {
try {
maybeRefresh();
} finally { //schedule again after we refreshed
if (isMaster) {
if (logger.isTraceEnabled()) {
logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString());
}
try {
threadPool.schedule(updateFrequency, executorName(), this);
} catch (EsRejectedExecutionException ex) {
logger.debug("Reschedule cluster info service was rejected", ex);
}
}
}
});
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex);
}
}
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
final ClusterState previousState = event.previousState();
final ClusterState state = event.state();
final String localNodeId = state.nodes().getLocalNodeId();
assert localNodeId != null;
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
indexSettings = indexService.getIndexSettings();
indicesService.removeIndex(index, DELETED, "index no longer part of the metadata");
} else if (previousState.metaData().hasIndex(index.getName())) {
final IndexMetaData metaData = previousState.metaData().index(index);
indexSettings = new IndexSettings(metaData, settings);
indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metaData, state);
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();
if (state.nodes().isLocalNodeElectedMaster() == false) {
return;
if (changes.isPresent()) {
if (upgradesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size() + 1)) {
logger.info("Starting template upgrade to version {}, {} templates will be updated and {} will be removed",
Version.CURRENT,
changes.get().v1().size(),
changes.get().v2().size());
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
threadPool.generic().execute(() -> upgradeTemplates(changes.get().v1(), changes.get().v2()));
代码示例来源:origin: org.elasticsearch/elasticsearch
private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
if (notifiedMasterFailure.compareAndSet(false, true)) {
try {
threadPool.generic().execute(() -> {
for (Listener listener : listeners) {
listener.onMasterFailure(masterNode, cause, reason);
}
});
} catch (EsRejectedExecutionException e) {
logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
}
stop("master failure, " + reason);
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
private void handleException(final TransportResponseHandler handler, Throwable error) {
if (!(error instanceof RemoteTransportException)) {
error = new RemoteTransportException(error.getMessage(), error);
}
final RemoteTransportException rtx = (RemoteTransportException) error;
threadPool.executor(handler.executor()).execute(() -> {
try {
handler.handleException(rtx);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("failed to handle exception response [{}]", handler), e);
}
});
}
代码示例来源:origin: org.elasticsearch/elasticsearch
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}",
numMergesInFlight, maxNumMerges);
deactivateThrottling();
System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
代码示例来源:origin: org.elasticsearch/elasticsearch
private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
try {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Listener listener : listeners) {
listener.onNodeFailure(node, reason);
}
}
});
} catch (EsRejectedExecutionException ex) {
logger.trace(() -> new ParameterizedMessage(
"[node ] [{}] ignoring node failure (reason [{}]). Local node is shutting down", node, reason), ex);
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
final List<TransportAddress> seedAddresses = new ArrayList<>();
seedAddresses.addAll(hostsProvider.buildDynamicHosts(createHostsResolver()));
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
seedAddresses.add(masterNode.value.getAddress());
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
nodes.getLocalNode(), connectionProfile);
activePingingRounds.put(pingingRound.id(), pingingRound);
final AbstractRunnable pingSender = new AbstractRunnable() {
threadPool.generic().execute(pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender);
threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
代码示例来源:origin: dadoonet/fscrawler
logger.info("Elasticsearch Client for version {}.x connected to a node running version {}", compatibleVersion(), getVersion());
} catch (ElasticsearchStatusException e) {
logger.debug("got an error while trying to connect to elasticsearch cluster");
throw new IOException(e);
} catch (Exception e) {
logger.warn("failed to create elasticsearch client, disabling crawler...");
throw e;
(request, bulkListener) -> client.bulkAsync(request, bulkListener);
threadPool = new ThreadPool(Settings.builder().put("node.name", "fscrawler-client").build());
bulkProcessor = new BulkProcessor.Builder(bulkConsumer, new DebugListener(logger), threadPool)
.setBulkActions(settings.getElasticsearch().getBulkSize())
.setFlushInterval(TimeValue.timeValueMillis(settings.getElasticsearch().getFlushInterval().millis()))
.setBulkSize(new ByteSizeValue(settings.getElasticsearch().getByteSize().getBytes()))
.build();
代码示例来源:origin: floragunncom/search-guard
public User authenticate(final TransportRequest request, final String sslPrincipal, final Task task, final String action) {
if(log.isDebugEnabled() && request.remoteAddress() != null) {
log.debug("Transport authentication request from {}", request.remoteAddress());
log.error("Not yet initialized (you may need to run sgadmin)");
return null;
final String authorizationHeader = threadPool.getThreadContext().getHeader("Authorization");
代码示例来源:origin: apache/servicemix-bundles
static final class Fields {
static final String TYPE = "type";
static final String MIN = "min";
static final String MAX = "max";
static final String KEEP_ALIVE = "keep_alive";
static final String QUEUE_SIZE = "queue_size";
}
}
代码示例来源:origin: floragunncom/search-guard
LOGGER.info("Check if "+searchguardIndex+" index exists ...");
.masterNodeTimeout(TimeValue.timeValueMinutes(1));
final ThreadContext threadContext = threadPool.getThreadContext();
try(StoredContext ctx = threadContext.stashContext()) {
threadContext.putHeader(ConfigConstants.SG_CONF_REQUEST_HEADER, "true");
LOGGER.error("Failure while executing IndicesExistsRequest {}",e2, e2);
bgThread.start();
代码示例来源:origin: dadoonet/fscrawler
@Override
public void close() throws IOException {
logger.debug("Closing Elasticsearch client manager");
if (bulkProcessor != null) {
try {
bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Did not succeed in closing the bulk processor for documents", e);
throw new IOException(e);
}
}
if (threadPool != null) {
threadPool.shutdownNow();
}
if (lowLevelClient != null) {
lowLevelClient.close();
}
}
内容来源于网络,如有侵权,请联系作者删除!