我用的是spark2。我正在尝试从rabbitmq获取搜索文本流,并对elasticsearch进行查询。
params.put("hosts", "IP");
params.put("queueName", "query");
params.put("exchangeName", "Exchangequery");
params.put("vHost", "/");
params.put("userName", "test");
params.put("password", "test");
Function<byte[], String> messageHandler = new Function<byte[], String>() {
public String call(byte[] message) {
return new String(message);
}
};
JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jssc, String.class, params, messageHandler);
messages.foreachRDD();
上面的代码从rabbitmq接收stram。但我不知道如何连接es和查询流批处理。一件事是,如果我用 messages.foreachRDD();
并查询每个输入项的elasticsearch,则会影响性能。
我总是只使用一个字段查询elasticsearch。例如
我的战略 messages
输入如下
apple
orange
我有一个索引 fruit
我想问一下 ?q=apple or orange
. 我知道我必须用 should
在ElasticSearch中。我的问题是如何使用从rabbitmq流接收的值查询es
1条答案
按热度按时间d6kp6zgx1#
代码只对elasticsearch服务器进行了一次调用(基本上它构造了一个包含大量should子句的查询)
生成的查询: