我们在玩Kafka,elasticsearch和logstash。为了进行试验,我们希望构建一个小型服务:
当用户点击一个给定的链接时产生“点击”信息,并将其推送到Kafka
将这些点击消息编入elasticsearch
聚合这些单击消息并将它们推送到Kafka。
现在我们已经有了一个服务,它可以生成点击消息并将它们推送到kafka主题中,我们使用logstash来读取这些消息并将它们推送到elasticsearch中。
我们现在需要的是一种生成包含聚合结果的新Kafka消息的方法。我们尝试使用以下日志存储配置文件(此时聚合请求完全是随机的):
input {
elasticsearch {
hosts => "localhost"
query => '{
"query": {
"query_string": {
"query": "*",
"analyze_wildcard": true
}
},
"size": 0,
"aggs": {
"messages": {
"value_count": {
"field": "message"
}
}
}
}'
}
}
output {
kafka {
topic_id => 'aggregated_stats'
}
}
不幸的是,elasticsearch向我们提供了以下错误消息:
org.elasticsearch.ElasticsearchIllegalArgumentException: aggregations are not supported with search_type=scan
此外,logstash似乎在生成单个消息后立即退出,而我们希望它随着聚合统计信息的更改而不断生成新消息。
这些有什么意义吗?
1条答案
按热度按时间k75qkfdt1#
在输出配置中添加“broker\u list”时,它在我的服务器上工作。