org.elasticsearch.client.Client.prepareBulk()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(331)

本文整理了Java中org.elasticsearch.client.Client.prepareBulk()方法的一些代码示例,展示了Client.prepareBulk()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Client.prepareBulk()方法的具体详情如下:
包路径:org.elasticsearch.client.Client
类名称:Client
方法名:prepareBulk

Client.prepareBulk介绍

[英]Executes a bulk of index / delete operations.
[中]执行大量索引/删除操作。

代码示例

代码示例来源:origin: thinkaurelius/titan

public void restore(Map<String,Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException {
  BulkRequestBuilder bulk = client.prepareBulk();
  int requests = 0;
  try {
    for (Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) {
      String store = stores.getKey();
      for (Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) {
        String docID = entry.getKey();
        List<IndexEntry> content = entry.getValue();
        if (content == null || content.size() == 0) {
          // delete
          if (log.isTraceEnabled())
            log.trace("Deleting entire document {}", docID);
          bulk.add(new DeleteRequest(indexName, store, docID));
          requests++;
        } else {
          // Add
          if (log.isTraceEnabled())
            log.trace("Adding entire document {}", docID);
          bulk.add(new IndexRequest(indexName, store, docID).source(getNewDocument(content, informations.get(store), IndexMutation.determineTTL(content))));
          requests++;
        }
      }
    }
    if (requests > 0)
      bulk.execute().actionGet();
  } catch (Exception e) {
    throw convert(e);
  }
}

代码示例来源:origin: apache/flume

@Override
public void execute() throws Exception {
 try {
  BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
  if (bulkResponse.hasFailures()) {
   throw new EventDeliveryException(bulkResponse.buildFailureMessage());
  }
 } finally {
  bulkRequestBuilder = client.prepareBulk();
 }
}

代码示例来源:origin: loklak/loklak_server

/**
 * Delete a list of documents for a given set of ids
 * ATTENTION: read about the time-out of version number checking in the method above.
 *
 * @param ids
 *            a map from the unique identifier of a document to the document type
 * @return the number of deleted documents
 */
private int deleteBulk(String indexName, Map<String, String> ids) {
  // bulk-delete the ids
  if (ids == null || ids.size() == 0) return 0;
  BulkRequestBuilder bulkRequest = elasticsearchClient.prepareBulk();
  for (Map.Entry<String, String> id : ids.entrySet()) {
    bulkRequest.add(new DeleteRequest().id(id.getKey()).index(indexName).type(id.getValue()));
  }
  bulkRequest.execute().actionGet();
  return ids.size();
}

代码示例来源:origin: Netflix/conductor

@Override
public void addTaskExecutionLogs(List<TaskExecLog> taskExecLogs) {
  if (taskExecLogs.isEmpty()) {
    return;
  }
  try {
    BulkRequestBuilder bulkRequestBuilder = elasticSearchClient.prepareBulk();
    for (TaskExecLog log : taskExecLogs) {
      IndexRequest request = new IndexRequest(logIndexName, LOG_DOC_TYPE);
      request.source(objectMapper.writeValueAsBytes(log), XContentType.JSON);
      bulkRequestBuilder.add(request);
    }
    new RetryUtil<BulkResponse>().retryOnException(
      () -> bulkRequestBuilder.execute().actionGet(),
      null,
      BulkResponse::hasFailures,
      RETRY_COUNT,
      "Indexing all execution logs into doc_type task",
      "addTaskExecutionLogs"
    );
  } catch (Exception e) {
    List<String> taskIds = taskExecLogs.stream()
      .map(TaskExecLog::getTaskId)
      .collect(Collectors.toList());
    logger.error("Failed to index task execution logs for tasks: ", taskIds, e);
  }
}

代码示例来源:origin: apache/usergrid

private BulkRequestBuilder initRequest() {
  BulkRequestBuilder bulkRequest = client.prepareBulk();
  bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
  bulkRequest.setRefresh( config.isForcedRefresh() );
  return bulkRequest;
}

代码示例来源:origin: pentaho/pentaho-kettle

public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
 Object[] rowData = getRow();
 if ( rowData == null ) {
  if ( currentRequest != null && currentRequest.numberOfActions() > 0 ) {
   // didn't fill a whole batch
   processBatch( false );
  }
  setOutputDone();
  return false;
 }
 if ( first ) {
  first = false;
  setupData();
  currentRequest = client.prepareBulk();
  requestsBuffer = new ArrayList<IndexRequestBuilder>( this.batchSize );
  initFieldIndexes();
 }
 try {
  data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
  return indexRow( data.inputRowMeta, rowData ) || !stopOnError;
 } catch ( KettleStepException e ) {
  throw e;
 } catch ( Exception e ) {
  rejectAllRows( e.getLocalizedMessage() );
  String msg = BaseMessages.getString( PKG, "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage() );
  logError( msg );
  throw new KettleStepException( msg, e );
 }
}

代码示例来源:origin: thinkaurelius/titan

@Override
public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException {
  BulkRequestBuilder brb = client.prepareBulk();

代码示例来源:origin: loklak/loklak_server

BulkRequestBuilder bulkRequest = elasticsearchClient.prepareBulk();
for (BulkWriteEntry be: jsonMapList) {
  if (be.getId() == null) continue;

代码示例来源:origin: apache/flume

@Override
public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
  String indexType, long ttlMs) throws Exception {
 if (bulkRequestBuilder == null) {
  bulkRequestBuilder = client.prepareBulk();
 }
 IndexRequestBuilder indexRequestBuilder = null;
 if (indexRequestBuilderFactory == null) {
  indexRequestBuilder = client
    .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
    .setSource(serializer.getContentBuilder(event).bytes());
 } else {
  indexRequestBuilder = indexRequestBuilderFactory.createIndexRequest(
    client, indexNameBuilder.getIndexPrefix(event), indexType, event);
 }
 if (ttlMs > 0) {
  indexRequestBuilder.setTTL(ttlMs);
 }
 bulkRequestBuilder.add(indexRequestBuilder);
}

代码示例来源:origin: pentaho/pentaho-kettle

currentRequest = client.prepareBulk();
data.nextBufferRowIdx = 0;
data.inputRowBuffer = new Object[batchSize][];

代码示例来源:origin: SonarSource/sonarqube

public void putDocuments(IndexType indexType, Map<String, Object>... docs) {
 try {
  BulkRequestBuilder bulk = SHARED_NODE.client().prepareBulk()
   .setRefreshPolicy(REFRESH_IMMEDIATE);
  for (Map<String, Object> doc : docs) {
   bulk.add(new IndexRequest(indexType.getIndex(), indexType.getType())
    .source(doc));
  }
  BulkResponse bulkResponse = bulk.get();
  if (bulkResponse.hasFailures()) {
   throw new IllegalStateException(bulkResponse.buildFailureMessage());
  }
 } catch (Exception e) {
  throw Throwables.propagate(e);
 }
}

代码示例来源:origin: komoot/photon

public Importer(Client esClient, String languages) {
  this.esClient = esClient;
  this.bulkRequest = esClient.prepareBulk();
  this.languages = languages.split(",");
}

代码示例来源:origin: komoot/photon

public Updater(Client esClient, String languages) {
  this.esClient = esClient;
  this.bulkRequest = esClient.prepareBulk();
  this.languages = languages.split(",");
}

代码示例来源:origin: SonarSource/sonarqube

public void putDocuments(IndexType indexType, BaseDoc... docs) {
 try {
  BulkRequestBuilder bulk = SHARED_NODE.client().prepareBulk()
   .setRefreshPolicy(REFRESH_IMMEDIATE);
  for (BaseDoc doc : docs) {
   bulk.add(new IndexRequest(indexType.getIndex(), indexType.getType(), doc.getId())
    .parent(doc.getParent())
    .routing(doc.getRouting())
    .source(doc.getFields()));
  }
  BulkResponse bulkResponse = bulk.get();
  if (bulkResponse.hasFailures()) {
   throw new IllegalStateException(bulkResponse.buildFailureMessage());
  }
 } catch (Exception e) {
  throw Throwables.propagate(e);
 }
}

代码示例来源:origin: apache/nifi

final BulkRequestBuilder bulk = esClient.get().prepareBulk();

代码示例来源:origin: apache/nifi

final BulkRequestBuilder bulk = esClient.get().prepareBulk();
if (authToken != null) {
  bulk.putHeader("Authorization", authToken);

代码示例来源:origin: komoot/photon

private void updateDocuments() {
    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
    if (bulkResponse.hasFailures()) {
      log.error("error while bulk update: " + bulkResponse.buildFailureMessage());
    }
    this.bulkRequest = this.esClient.prepareBulk();
  }
}

代码示例来源:origin: komoot/photon

private void saveDocuments() {
  if (this.documentCount < 1) return;
  BulkResponse bulkResponse = bulkRequest.execute().actionGet();
  if (bulkResponse.hasFailures()) {
    log.error("error while bulk import:" + bulkResponse.buildFailureMessage());
  }
  this.bulkRequest = this.esClient.prepareBulk();
}

代码示例来源:origin: yacy/yacy_grid_mcp

/**
 * Delete a list of documents for a given set of ids
 * ATTENTION: read about the time-out of version number checking in the method above.
 * 
 * @param ids
 *            a map from the unique identifier of a document to the document type
 * @return the number of deleted documents
 */
public int deleteBulk(String indexName, Map<String, String> ids) {
  // bulk-delete the ids
  if (ids == null || ids.size() == 0) return 0;
  BulkRequestBuilder bulkRequest = elasticsearchClient.prepareBulk();
  for (Map.Entry<String, String> id : ids.entrySet()) {
    bulkRequest.add(new DeleteRequest().id(id.getKey()).index(indexName).type(id.getValue()));
  }
  bulkRequest.execute().actionGet();
  return ids.size();
}

代码示例来源:origin: larsga/Duke

private void flushIndex(boolean force) {
  if ((force && this.bulkRequestCounter > 0)
      || this.bulkRequestCounter >= this.bulkSize) {
    BulkResponse bulkResponse = this.bulkRequest.execute().actionGet();
    if (bulkResponse.hasFailures()) {
      throw new DukeException(bulkResponse.buildFailureMessage());
    }
    // reset bulk
    this.bulkRequestCounter = 0;
    this.bulkRequest = this.client.prepareBulk();
  }
}

相关文章