如何将Elasticsearch JSON String Response与Aggregation转换为Elasticsearch SearchResponse Object

jrcvhitl  于 2023-06-21  发布在  ElasticSearch
关注(0)|答案(5)|浏览(185)

我想将一个json字符串序列化为Elasticsearch SearchResponse对象。如果json字符串不包含聚合,它可以正常工作。
如果json字符串包含聚合,XContentParser将抛出ParsingException[Could not parse aggregation keyed as [target_field]异常。
我用来将json字符串序列化为Elasticsearch SearchResponse对象的代码:

Settings settings = Settings.builder().build();
    SearchModule searchModule = new SearchModule(settings, false, new ArrayList<>());

    NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());

    JsonXContentParser xContentParser = new JsonXContentParser(xContentRegistry,
            new JsonFactory().createParser(json));
    SearchResponse response = SearchResponse.fromXContent(xContentParser);

看来我必须将聚合注册到NamedXContentRegistry,但我不知道如何注册。

zlwx9yxi

zlwx9yxi1#

背景:

我是根据我创建SearchResponse对象以编写Java单元测试的经验编写这个答案的。目标是从Elasticsearch查询中获取任何JSON响应对象,将其编组到SearchResponse对象中,并对创建可消费输出的业务逻辑进行单元测试。
我们使用Elasticsearch 6.7,高级REST客户端,并使用Elastic的POJO解析SearchResponse(而不是只做一个.toString()并使用GSON或Jackson操作它)。

解决方案说明:

Elasticsearch的高级REST客户端一般解析来自低级REST客户端的结果。SearchRequest的响应JSON在 search 方法第129行的RestHighLevelClient中转换为SearchResponseObject。此方法在第1401行调用 performRequestAndParseEntity,它接受entityParser作为CheckedFunction<XContentParser, Resp, IOException>。最后,我们可以看到,当在第1401行调用entityParser时,它在第1714行调用parseEntity方法,该方法确定实体的XContentType并最终执行解析。值得注意的是,当在第1726行创建解析器时,registry被传递到解析器中。此 registry 包含响应字段可能的所有可能的XContent值。registry 是在第288行构造RestHighLevelClient时创建的。类型的完整列表(包括聚合类型)在第1748行上列出。

关于解决方案:

阅读了Elasticsearch关于这方面的讨论,似乎如果你想从Elastic注入一个JSON Response到SearchResponse对象中,需要创建一个NamedXContentRegistry和XContents测试列表,你必须重新创建解析。一个辅助方法sourced from Elastic's discussion可以做到这一点:

public static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
    Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>();
    map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c));
    map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
    List<NamedXContentRegistry.Entry> entries = map.entrySet().stream()
            .map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
            .collect(Collectors.toList());
  return entries;
}

上述代码中的Map需要包含测试所需的***所有***聚合。有两个以上,两个在这里为简洁。
使用这个助手getNamedXContents()方法,现在可以使用以下方法获取JSON字符串并将其注入SearchResponse。Also sourced from Elastic's Discussion

public static SearchResponse getSearchResponseFromJson(String jsonResponse){
    try {
        NamedXContentRegistry registry = new NamedXContentRegistry(getDefaultNamedXContents());
        XContentParser parser = JsonXContent.jsonXContent.createParser(registry, jsonResponse);
        return SearchResponse.fromXContent(parser);
    } catch (IOException e) {
        System.out.println("exception " + e);
    }catch (Exception e){
        System.out.println("exception " + e);
    }
    return new SearchResponse();
}

应用具有聚合结果的解决方案:

Elasticsearch需要一个提示来知道将其解析为哪种类型的聚合。当添加**?时,由弹性提供提示。typed_keys到查询。在Aggregation Type Hints上的Elasticsearch文档中显示了一个示例。
要将JSON String注入到
SearchResponse**对象中,必须(1)使用上述方法,(2)注入一个带有类型提示的字符串。

主要来源:

  1. https://discuss.elastic.co/t/elasticsearch-json-response-to-searchresponse-object/124394/6
  2. https://github.com/elastic/elasticsearch/blob/master/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
  3. https://github.com/elastic/elasticsearch/blob/master/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java
  4. https://www.elastic.co/guide/en/elasticsearch/reference/current/returning-aggregation-type.html
    注意:2015年前后有很多文章说这是不可能的。这显然是不正确的。
eyh26e7m

eyh26e7m2#

根据上面的答案,我是这样做的:
我写了一个这样的JSON:

XContentBuilder builder = XContentFactory.jsonBuilder();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
String result = Strings.toString(builder);

然后我就这样读了

try {
     NamedXContentRegistry registry = new NamedXContentRegistry(getDefaultNamedXContents());
     XContentParser parser = JsonXContent.jsonXContent.createParser(registry, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, result);
     SearchResponse searchResponse = SearchResponse.fromXContent(parser);
 } catch (IOException e) {
     System.out.println("exception " + e);
 } catch (Exception e) {
     System.out.println("exception " + e);
 }

public static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
    Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>();
    map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c));
    map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
    List<NamedXContentRegistry.Entry> entries = map.entrySet().stream()
            .map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue()))
            .collect(Collectors.toList());
    return entries;
}

希望它能起作用:)

gg0vcinb

gg0vcinb3#

你需要在你请求的URL末尾加上?typed_keys,比如/cranking/_search?typed_keys,看看这个参考。
你最好像框架源代码一样在NamedXContentRegistry中添加更多的parse注册表。以下是所有注册表项:

private List<NamedXContentRegistry.Entry> getProvidedNamedXContents() {
    List<NamedXContentRegistry.Entry> entries = new ArrayList<>();

    for (NamedXContentProvider service : ServiceLoader.load(NamedXContentProvider.class)) {
        entries.addAll(service.getNamedXContentParsers());
    }

    return entries;
}

private NamedXContentRegistry getDefaultNamedXContentRegistry() {
    List<NamedXContentRegistry.Entry> entries = new ArrayList<>();
    entries.addAll(getDefaultNamedXContents());
    entries.addAll(getProvidedNamedXContents());
    return new NamedXContentRegistry(entries);
}

private List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
    Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>();
    map.put("cardinality", (p, c) -> ParsedCardinality.fromXContent(p, (String) c));
    map.put("hdr_percentiles", (p, c) -> ParsedHDRPercentiles.fromXContent(p, (String) c));
    map.put("hdr_percentile_ranks", (p, c) -> ParsedHDRPercentileRanks.fromXContent(p, (String) c));
    map.put("tdigest_percentiles", (p, c) -> ParsedTDigestPercentiles.fromXContent(p, (String) c));
    map.put("tdigest_percentile_ranks", (p, c) -> ParsedTDigestPercentileRanks.fromXContent(p, (String) c));
    map.put("percentiles_bucket", (p, c) -> ParsedPercentilesBucket.fromXContent(p, (String) c));
    map.put("min", (p, c) -> ParsedMin.fromXContent(p, (String) c));
    map.put("max", (p, c) -> ParsedMax.fromXContent(p, (String) c));
    map.put("sum", (p, c) -> ParsedSum.fromXContent(p, (String) c));
    map.put("avg", (p, c) -> ParsedAvg.fromXContent(p, (String) c));
    map.put("value_count", (p, c) -> ParsedValueCount.fromXContent(p, (String) c));
    map.put("simple_value", (p, c) -> ParsedSimpleValue.fromXContent(p, (String) c));
    map.put("derivative", (p, c) -> ParsedDerivative.fromXContent(p, (String) c));
    map.put("bucket_metric_value", (p, c) -> ParsedBucketMetricValue.fromXContent(p, (String) c));
    map.put("stats", (p, c) -> ParsedStats.fromXContent(p, (String) c));
    map.put("stats_bucket", (p, c) -> ParsedStatsBucket.fromXContent(p, (String) c));
    map.put("extended_stats", (p, c) -> ParsedExtendedStats.fromXContent(p, (String) c));
    map.put("extended_stats_bucket", (p, c) -> ParsedExtendedStatsBucket.fromXContent(p, (String) c));
    map.put("geo_bounds", (p, c) -> ParsedGeoBounds.fromXContent(p, (String) c));
    map.put("geo_centroid", (p, c) -> ParsedGeoCentroid.fromXContent(p, (String) c));
    map.put("histogram", (p, c) -> ParsedHistogram.fromXContent(p, (String) c));
    map.put("date_histogram", (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c));
    map.put("sterms", (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
    map.put("lterms", (p, c) -> ParsedLongTerms.fromXContent(p, (String) c));
    map.put("dterms", (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c));
    map.put("missing", (p, c) -> ParsedMissing.fromXContent(p, (String) c));
    map.put("nested", (p, c) -> ParsedNested.fromXContent(p, (String) c));
    map.put("reverse_nested", (p, c) -> ParsedReverseNested.fromXContent(p, (String) c));
    map.put("global", (p, c) -> ParsedGlobal.fromXContent(p, (String) c));
    map.put("filter", (p, c) -> ParsedFilter.fromXContent(p, (String) c));
    map.put("sampler", (p, c) -> ParsedSampler.fromXContent(p, (String) c));
    map.put("geohash_grid", (p, c) -> ParsedGeoHashGrid.fromXContent(p, (String) c));
    map.put("range", (p, c) -> ParsedRange.fromXContent(p, (String) c));
    map.put("date_range", (p, c) -> ParsedDateRange.fromXContent(p, (String) c));
    map.put("geo_distance", (p, c) -> ParsedGeoDistance.fromXContent(p, (String) c));
    map.put("filters", (p, c) -> ParsedFilters.fromXContent(p, (String) c));
    map.put("adjacency_matrix", (p, c) -> ParsedAdjacencyMatrix.fromXContent(p, (String) c));
    map.put("siglterms", (p, c) -> ParsedSignificantLongTerms.fromXContent(p, (String) c));
    map.put("sigsterms", (p, c) -> ParsedSignificantStringTerms.fromXContent(p, (String) c));
    map.put("scripted_metric", (p, c) -> ParsedScriptedMetric.fromXContent(p, (String) c));
    map.put("ip_range", (p, c) -> ParsedBinaryRange.fromXContent(p, (String) c));
    map.put("top_hits", (p, c) -> ParsedTopHits.fromXContent(p, (String) c));
    map.put("composite", (p, c) -> ParsedComposite.fromXContent(p, (String) c));
    List<NamedXContentRegistry.Entry> entries = map.entrySet().stream()
            .map((entry) -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField((String) entry.getKey()), entry.getValue()))
            .collect(Collectors.toList());
    entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField("term"), (parser, context) -> TermSuggestion.fromXContent(parser, (String) context)));
    entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField("phrase"), (parser, context) -> PhraseSuggestion.fromXContent(parser, (String) context)));
    entries.add(new NamedXContentRegistry.Entry(Suggest.Suggestion.class, new ParseField("completion"), (parser, context) -> CompletionSuggestion.fromXContent(parser, (String) context)));
    return entries;
}
eagi6jfj

eagi6jfj4#

我在使用ElasticSearch 7.15时遇到了同样的问题。上面的技术Maven给出的答案确实有助于解决这个问题,但它仍然对我不起作用,因为聚合物没有被识别。我的JSON中的聚合看起来像下面这个:

{
  ...
  "aggregations": {
    "my-agg-name": {                 
      "buckets": []
    }
  }
}

就像this article中解释的那样,这个问题是由于期望聚合在响应中返回为sterms #my-agg-name,而原始JSON只包含聚合的名称my-agg-name。使用上面的代码并将相关的聚合类型添加到注册表并不起作用。
我发现一个简单的解决方案就是在响应中返回聚合类型。因此,根据the official documentation for the aggregation feature中的示例,将***typed_key***添加到聚合请求中:

GET /my-index-000001/_search?typed_keys
{
  "aggs": {
    "my-agg-name": {
      "histogram": {
        "field": "my-field",
        "interval": 1000
      }
    }
  }
}

将在响应中返回相关的聚合类型,如下所示(histogram #my-agg-name):

{
  ...
  "aggregations": {
    "histogram#my-agg-name": {                 
      "buckets": []
    }
  }
}

现在解析器将识别相关的聚合类型,转换将成功。如果没有,请确保答案中返回的聚合类型包含在注册表Map中。这在ElasticSearch 7.15中很有用。

nimxete2

nimxete25#

让我从另一个Angular 来处理这个问题。你需要SearchResponse做什么?在大多数情况下,当您想要测试服务逻辑时,您需要在测试中从例如RestHighLevelClient搜索方法返回它。
如果是这样,为什么不嘲笑SearchResponse呢?假设您收到以下ES查询的SearchResponse

GET index_name/_search
{
  "aggs": {
    "AGG_NAME": {
      "terms": {
        "field": "someField"
      }
    }
  }
}

和响应:

{
  "aggregations" : {
    "AGG_NAME": {
      "buckets": [
        {
          "key": "AAA",
          "doc_count": 100
        },
        {
          "key": "BBB",
          "doc_count": 200
        }
      ]
    }
  }
}

现在在服务中,您将其转换为具有bucket count的Map<String, Long>

Map<String, Long> searchResponseToMap(SearchResponse searchResponse) {
        var buckets = Optional.ofNullable(searchResponse)
            .map(SearchResponse::getAggregations)
            .map(aggregations -> (ParsedStringTerms) aggregations.get("AGG_NAME"))
            .map(ParsedTerms::getBuckets)
            .orElse(List.of());

        return buckets.stream().collect(Collectors.toMap(Bucket::getKeyAsString, Bucket::getDocCount));
    }

在测试中,而不是像其他答案中建议的那样创建SearchResponse,您可以像这样模拟它:

@Test
void shouldReturnAggregations() {
    // AAA/BBB buckets
    Terms.Bucket aaa = mock(Terms.Bucket.class);
    ParsedStringTerms.ParsedBucket bbb = mock(ParsedStringTerms.ParsedBucket.class);
    when(aaa.getKeyAsString()).thenReturn("AAA");
    when(aaa.getDocCount()).thenReturn(100L);
    when(bbb.getKeyAsString()).thenReturn("BBB");
    when(bbb.getDocCount()).thenReturn(200L);
    
    // AGG_NAME
    ParsedStringTerms aggName = mock(ParsedStringTerms.class);
    when(aggName.getBuckets()).thenAnswer(invocation -> List.of(aaa, bbb));
    
    Aggregations aggregations = mock(Aggregations.class);
    when(aggregations.get("AGG_NAME")).thenReturn(aggName);
    
    // searchResponse
    SearchResponse searchResponse = mock(SearchResponse.class);
    when(searchResponse.getAggregations()).thenReturn(aggregations);
    
    when(esRepository.search(any())).thenReturn(searchResponse);
    
    // When
    Map<String, Long> result = service.fetchAndTransform();
    
    // Then
    assertThat(result).containsEntry("AAA", 100L)
        .containsEntry("BBB", 200L);
}

相关问题