ElasticSearch滚动API异步执行

j1dl9f46  于 2022-11-02  发布在  ElasticSearch
关注(0)|答案(2)|浏览(168)

我正在运行一个ElasticSearch集群5.6版本,每天索引大小为70Gb。每天结束时,我们需要对过去7天的每小时进行总结。我们使用的是Java版本的高级Rest客户端,考虑到每个查询返回的文档数量对滚动结果至关重要。
为了充分利用现有的CPU,并减少读取时间,我们考虑使用Scroll Asynchronous版本的搜索,但我们缺少一些示例,至少缺少其中的逻辑。
我们已经检查了弹性相关文档,但它的模糊:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-high-search-scroll.html#java-rest-high-search-scroll-async
我们也在弹性论坛上问过他们说的话,但看起来没有人能回答:
https://discuss.elastic.co/t/no-code-for-example-of-using-scrollasync-with-the-java-high-level-rest-client/165126
在这方面的任何帮助将非常感谢,并肯定我不是唯一一个有这个要求。

a6b3iqyw

a6b3iqyw1#

下面是示例代码:

public class App {

  public static void main(String[] args) throws IOException, InterruptedException {
    RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(HttpHost.create("http://localhost:9200")));

    client.indices().delete(new DeleteIndexRequest("test"), RequestOptions.DEFAULT);
    for (int i = 0; i < 100; i++) {
      client.index(new IndexRequest("test", "_doc").source("foo", "bar"), RequestOptions.DEFAULT);
    }
    client.indices().refresh(new RefreshRequest("test"), RequestOptions.DEFAULT);

    SearchRequest searchRequest = new SearchRequest("test").scroll(TimeValue.timeValueSeconds(30L));
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    String scrollId = searchResponse.getScrollId();

    System.out.println("response = " + searchResponse);

    SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
        .scroll(TimeValue.timeValueSeconds(30));

    //I was missing to wait for the results
    final CountDownLatch countDownLatch = new CountDownLatch(1);

    client.scrollAsync(scrollRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
      @Override
      public void onResponse(SearchResponse searchResponse) {
        System.out.println("response async = " + searchResponse);
      }

      @Override
      public void onFailure(Exception e) {
      }
    });

    //Here we wait
    countDownLatch.await();

    //Clear the scroll if we finish before the time to keep it alive.
    //Otherwise it will be clear when the time is reached.
    ClearScrollRequest request = new ClearScrollRequest();
    request.addScrollId(scrollId);

    client.clearScrollAsync(request, new ActionListener<ClearScrollResponse>() {
      @Override
      public void onResponse(ClearScrollResponse clearScrollResponse) {
      }

      @Override
      public void onFailure(Exception e) {
      }
    });

    client.close();           
  }
}

感谢大卫·皮拉托elastic discussion

cwdobuhd

cwdobuhd2#

过去7天内每小时的摘要
听起来您似乎希望对数据运行一些聚合,而不是获取原始文档。可能在第一级使用日期直方图,以便以1小时为间隔进行聚合。在该日期直方图内,您需要一个内部聚合来运行您的聚合-根据所需的汇总,可以是指标/桶。
从Elasticsearch v6.1开始,你可以使用Composite Aggregation,以便使用分页来获取所有的结果桶。
复合聚合可用于有效地对来自多级聚合的所有桶分页。此聚合提供了一种方法,以类似于滚动对文档所做的那样,对特定聚合的所有桶进行流处理。
不幸的是,在v6.1之前不存在此选项,因此您要么需要升级ES才能使用它,要么找到另一种方法,如分解为多个查询,这些查询一起将满足7天的要求。

相关问题