我想从storm中将文档索引到elasticsearch,但我无法将任何文档索引到elasticsearch。
在我的拓扑中,我有一个kafkaspout,它向esbolt发出一个json,比如{“tweetid”:1,“text”:“hello”},esbolt是elasticsearch hadoop库中的本机bolt,它将风暴元组写入elasticsearch(doc在这里:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html). 以下是我的esbolt的配置:
Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);
前两个配置默认有这些值,但我选择显式设置它们。我也尝试过不用它们,得到同样的结果。
这就是我构建拓扑的方式:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
.setNumTasks(kafkaSpoutNumberOfTasks);
builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
.setNumTasks(elasticsearchBoltNumberOfTasks)
.shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);
return builder.createTopology();
在本地运行拓扑之前,我在elasticsearch中创建了“twitter”索引,并为该索引创建了一个Map“tweet”。如果检索新创建的类型(curl-xget)的Map,就会得到这样的结果http://localhost:9200/twitter/\u mapping/tweet'):
{
"twitter": {
"mappings": {
"tweet": {
"properties": {
"text": {
"type": "string"
},
"tweetId": {
"type": "string"
}
}
}
}
}
}
我在本地运行拓扑,这是我在控制台中处理元组时得到的结果:
Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]
Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]
TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]
BOLT ack TASK: 6 TIME: TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]
Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:
所以元组似乎被处理了。但是,我没有在elasticsearch中索引任何文档。
当我为esbolt设置配置时,我想我做错了什么,可能是缺少了一个配置或什么的。
2条答案
按热度按时间dauxcl2d1#
我也遇到了同样的问题,但是当我查找es hadoop文档时,我发现因为我没有设置触发队列刷新的频率,很好。但是当我们设置config.topology\u tick\u tuple\u freq\u secs的值时,会抛出一个异常:java.lang.runtimeexception:java.lang.nullpointerexception in 螺栓执行功能。然后我们使用调试模式来测试我的拓扑,我发现bolt execute中的输入元组不包含任何条目,但是这个空元组被触发了。这就是我感到困惑的地方。不知道元组会根据设置时间发出,即使这个元组在我们设置config.topology\u tick\u tuple\u freq\u secs之后是空的,我想这是一个bug。在此处输入图像描述在此处输入图像描述
你可以提供更多信息see:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html
oyt4ldly2#
只有达到由es.storm.bolt.flush.entries.size指定的刷新大小,文档才会被索引
或者,您可以设置触发队列刷新的勾选频率。
默认情况下,es hadoop会根据es.storm.bolt.tick.tuple.flush参数刷新tick。