使用elasticsearch+logstash将聚合统计数据推送到kafka

hzbexzde  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(370)

我们在玩Kafka,elasticsearch和logstash。为了进行试验,我们希望构建一个小型服务:
当用户点击一个给定的链接时产生“点击”信息,并将其推送到Kafka
将这些点击消息编入elasticsearch
聚合这些单击消息并将它们推送到Kafka。
现在我们已经有了一个服务,它可以生成点击消息并将它们推送到kafka主题中,我们使用logstash来读取这些消息并将它们推送到elasticsearch中。
我们现在需要的是一种生成包含聚合结果的新Kafka消息的方法。我们尝试使用以下日志存储配置文件(此时聚合请求完全是随机的):

input {
    elasticsearch {
        hosts => "localhost"
        query => '{
                      "query": {
                        "query_string": {
                          "query": "*",
                          "analyze_wildcard": true
                        }
                      },
                      "size": 0,
                      "aggs": {
                          "messages": {
                              "value_count": {
                                  "field": "message"
                              }
                          }
                      }
                  }'
    }
}
output {
    kafka {
        topic_id => 'aggregated_stats'
    }
}

不幸的是,elasticsearch向我们提供了以下错误消息:

org.elasticsearch.ElasticsearchIllegalArgumentException: aggregations are not supported with search_type=scan

此外,logstash似乎在生成单个消息后立即退出,而我们希望它随着聚合统计信息的更改而不断生成新消息。
这些有什么意义吗?

k75qkfdt

k75qkfdt1#

在输出配置中添加“broker\u list”时,它在我的服务器上工作。

output {
      kafka {
      broker_list => "x.x.x.x:9092,y.y.y.y:9092"
      topic_id => "aggregated_stats"
      }
}

相关问题