在将完整日志发送到logstash/elasticsearch之前,将一半日志发送到rabbitmq的方法

dwthyt8l  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(538)

我有两个函数,每个函数都创建特定于一个事务的日志;它是一个多线程应用程序,因此func1的函数项对于事务可以是随机的,但是对于单个事务,它将仅通过func1、func2和func3顺序。

func1(transactionId) {
     log("%d Now in func1", transactionId);
}

func2(transactionId) {
     log("%d Now in func2", transactionId);
}

func3(transactionId) {
     log("%d Now in func3", transactionId);
}

现在,我想一次只为每个事务写入logstash;就是这样

1 Now in func1 Now in func2 Now in fun3

最后这需要转到ElasticSearch;
我正在考虑将半事务日志写入rabbitmq临时队列,然后在完成事务后,将其提交给rabbitmq生产者队列,将消息发送到logstash;
喜欢

func1(transactionId) {
     add2RMQ(transactionId, "Now in func1");
}

func2(transactionId) {
     add2RMQ("transactionId, "Now in func2");
}

func3(transactionId) {
      add2RMQ("transactionId, "Now in func3");
      /* Last point of transaction */
      commit2RMQ(transactionId);
}

时间commit2rmq execute the logstash应该会收到特定于要写入elasticsearch的事务的完整消息。
问题:
解决此问题的正确解决方案是将特定于事务的数据一次发送到elasticsearch?
我们能用rabbitmq解决这个问题吗?如果是这样的话,我需要使用什么样的api?
有没有什么方法可以不用rabbitmq而只使用logstash和elasticsearch实现同样的效果?
我不想使用elasticsearch更新api,因为它可能会为特定于某个事务的每个日志消息消耗大量搜索操作。

gupuwyp2

gupuwyp21#

尝试聚合与单个事务相关的不同日志行并不是一个简单的问题,尤其是当您将消息队列系统添加到mix中作为要聚合的日志的中间存储时。我会走另一条路,不涉及像rabbitmq这样的子系统。
此外,如果您试图将多个日志行连接到一个日志行中,则会丢失每个日志行可以提供的详细信息,例如每个函数执行所用的时间。如果 func2 ,分别 func3 ,引发异常?你应该存储一个只包含 func1 仅分别为 func1 以及 func2 ?
我将要写的东西可能可以转换成任何语言和任何日志解决方案,但是为了演示的目的,我假设您的程序是用java编写的,您使用的是log4j。
因此,我将利用log4j的mapped diagnostic context(mdc)在每个日志行中存储事务id(以及可能的其他数据,如用户名等)。这样您就可以轻松地检索与单个事务相关的所有日志行。这样做的好处是,您不必聚合任何内容,只需提供足够的上下文信息,以便kibana以后可以为您做这些。
在伪代码中,您将事务id直接添加到消息中。使用mdc而不是将id记录到消息中的优点是,它可以避免解析logstash中的所有消息,从而重新发现在创建日志行时已经知道的事务id。
因此,在代码中,一旦有了事务id,就可以将其添加到当前的每线程日志上下文中,如下所示:

import org.apache.log4j.MDC;

...
func1(transactionId) {
     // add the transaction ID to the logging context
     MDC.put("transactionID", transactionId);
     log("Now in func1");
}

func2(transactionId) {
     log("Now in func2");
}

func3(transactionId) {
     log("Now in func3");
}

然后在log4j配置文件中,可以使用 %X{transactionID} 模式以存储它,在本例中,我将它添加到线程名称之后,但您可以将它放在任何您喜欢的地方:

log4j.appender.consoleAppender.layout.ConversionPattern = %d [%t] [%X{transactionID}] %5p %c - %m%n

您的日志将类似于:

2015-09-28T05:07:28.425Z [http-8084-2] [625562271762]  INFO YourClass - Now in func1
2015-09-28T05:07:29.776Z [http-8084-2] [625562271762]  INFO YourClass - Now in func2
2015-09-28T05:07:30.652Z [http-8084-2] [625562271762]  INFO YourClass - Now in func3
                                              ^
                                              |
                                  the transaction ID is here

当您有这样的日志行时,那么通过一个logstash检索事务id就轻而易举了 grok 并将其存储在自己的 transactionID 日志存储索引中的字段。在kibana中,您可以搜索事务id并按timestamp desc排序,这样就可以显示该事务的所有上下文。
试试看!

相关问题