如何使用rabbitmq spark stram查询es

zzzyeukh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(353)

我用的是spark2。我正在尝试从rabbitmq获取搜索文本流,并对elasticsearch进行查询。

params.put("hosts", "IP");
params.put("queueName", "query");
params.put("exchangeName", "Exchangequery");
params.put("vHost", "/");
params.put("userName", "test");
params.put("password", "test");

Function<byte[], String> messageHandler = new Function<byte[], String>() {

    public String call(byte[] message) {
        return new String(message);
    }
};

JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jssc, String.class, params, messageHandler);

messages.foreachRDD();

上面的代码从rabbitmq接收stram。但我不知道如何连接es和查询流批处理。一件事是,如果我用 messages.foreachRDD(); 并查询每个输入项的elasticsearch,则会影响性能。
我总是只使用一个字段查询elasticsearch。例如
我的战略 messages 输入如下

apple 
orange

我有一个索引 fruit 我想问一下 ?q=apple or orange . 我知道我必须用 should 在ElasticSearch中。我的问题是如何使用从rabbitmq流接收的值查询es

d6kp6zgx

d6kp6zgx1#

代码只对elasticsearch服务器进行了一次调用(基本上它构造了一个包含大量should子句的查询)

public static void main(String[] args) throws UnknownHostException {

    Client client = TransportClient.builder().build()
            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

    List<String> messages = new ArrayList<>();
    messages.add("apple");
    messages.add("orange");

    String index = "fruit";
    String fieldName = "fruit_type";

    BoolQueryBuilder query = QueryBuilders.boolQuery();

    for (String message : messages) {
        query.should(QueryBuilders.matchQuery(fieldName, message));
        // alternative if you are not analyzing fields
        // query.should(QueryBuilders.termQuery(fieldName, message));
    }

    int size = 60; //you may want to change this since it defaults to 10
    SearchResponse response = client.prepareSearch(index).setQuery(query).setSize(size).execute().actionGet();

    long totalHits = response.getHits().getTotalHits();
    System.out.println("Found " + totalHits + " documents");
    for (SearchHit hit : response.getHits().getHits()) {
        System.out.println(hit.getSource());
    }
}

生成的查询:

{
  "bool" : {
    "should" : [ {
      "match" : {
        "fruit_type" : {
          "query" : "apple",
          "type" : "boolean"
        }
      }
    }, {
      "match" : {
        "fruit_type" : {
          "query" : "orange",
          "type" : "boolean"
        }
      }
    } ]
  }
}

相关问题