logstash 如何生成每日重置的单调递增数

omqzjyyz  于 2023-11-15  发布在  Logstash
关注(0)|答案(2)|浏览(225)

因此,我正在研究一个项目,将非结构化日志转换为结构化日志,该日志在2秒的微批中以近乎真实的的方式接收。
应用程序写入事件日志。每个日志在其消息中有一个字段,即事件ID。使用此事件ID,我们可以隔离不同的事件。
在每个事件中,需要提取的字段数是不同的。例如,如果事件ID为1,则通过解析该日志,我们将获得5个有用的字段。在事件ID为2时,它可能为10。此外,字段的含义也取决于事件。无论我们从事件ID为1的位置2获得什么值,它都与我们从事件ID为2的位置2获得的值绝对不同。结果每个事件ID在单独的模式中。
我想把这个事件存储在parquet中,所以我必须指定严格的模式。为此,我把所有的事件分开,然后把事件转换到合适的模式。但是在这个例子中,我丢失了所有事件到达或发生的顺序。我想这应该是使数据结构化的非常正式和标准的问题。我们如何有效和可靠地处理这种情况呢?我使用的是logstash->Kafka->电Spark流堆栈。
我想的一个解决方案是,我可以给每个消息添加递增计数器。由于某些原因,我不能在事件生成时添加这个递增的数字。我必须在logstash或Kafka或spark-streaming中处理它。由于kafka和spark-streaming是分布式的。我们维护单个顺序计数器似乎很困难(也想得到这个建议)。
所以我想把它加到logstash里。我想到了下面的解决方案。有一个计数器,然后增加一个。但是由于负载很重,它可能会在一段时间内超过这个数字。我们的更新很少,所以一旦我启动logstash,它可能会在一次启动中增加5年。这也会导致达到最大限制。

Q1)因此,我想实施计数器,以便在一天结束时将其重置为0。因为我们的数据是按天分区的。尝试以以下方式实施,但缺点是每次必须增加时,都必须检查条件。是否有更好的方法来处理这种情况?

# I tested i have around 5000 messages coming in single second
seqCounter = 0

def increase():
   seqCounter +=1
   if (current_time == day_end_time): # checking this condition will add overhead it seems.
      seqCounter = 0

字符串

第二季度)可靠性方面也有一个问题。由于系统可能会失败,或者由于某些原因,需要重新启动logstash,这样也会丢失当前序列。当它重新启动时,将从0开始。为了处理这个问题,我们可以缓存当前序列,但我猜这会增加潜在的io延迟,因为在文件中的每个增量编号都需要更新。此外,这会导致重复的序列号。是否存在是否有更好的方法来处理这种情况?
Q3)我可以使用Kafka偏移量,但它是基于每个分区的。因此,如果单个主题向多个分区发送数据(我的情况就是这样),偏移量将重复。是否有更好的方法来处理这种情况?
Q4)是否有任何其他解决方案可以代替logstash来处理此类情况?
**注意事项:在任何一个解决方案中,主要的目标都是获取序列号。序列号应该是单调递增的,这样我就可以通过基于此的排序来获取事件流。不需要连续的序列号。

任何帮助都是感激不尽的。

ttvkxqim

ttvkxqim1#

尝试在Logstash中生成此序列号仅在Logstash有单个worker(即-w 1)时才有效,否则如果有多个worker,事件将不会按生成顺序处理,而是根据有多少个worker并行处理。
也就是说,通过强制Logstash与单个worker一起工作,你会人为地减慢它的速度,所以这不是最好的主意。此外,正如你所指出的,你需要处理Logstash崩溃或需要重新启动的情况。
这个序列号必须是在Logstash之外生成的。首先想到的是使用Redis INCR,但是没有official redis filter plugins
另一个想法是使用一个SQL序列(例如Postgres),您可以轻松地使用jdbc_streaming filter plugin进行查询。

input {
   ...
}
filter {
  jdbc_streaming {
    jdbc_driver_library => "/path/to/jdbc-connector.jar"
    jdbc_driver_class => "com.product.jdbc.Driver"
    jdbc_connection_string => "jdbc:xxsql://localhost:1234/mydatabase"
    jdbc_user => "me"
    jdbc_password => "secret"
    statement => "SELECT nextval('sequence');"
    target => "sequence"
  }
}

字符串
只有当你已经有一个支持序列的数据库时,这才是一个可行的选择,但如果你需要为此设置一个数据库,那就不是了。另外,根据每秒生成的事件数量,必须查询远程数据库以获得序列号可能会成为瓶颈。
有些插件存在,比如this one,但是序列值只在内存中,如果Logstash重新启动,序列值会丢失。你也可以create your own Java filter plugin来生成一个递增的序列值,但是你仍然需要找到一种方法来在重新启动时保持该值。
有一些正在进行的问题(如#6997)开始工作,但大多数都是陈旧的。
最好的选择仍然是在创建事件时生成序列号。

qvsjd97n

qvsjd97n2#

如果你可以接受一个复合的160位字段(一个int,两个long),Quality的unique_id提供了一个唯一的id,它的要求非常简单(你的集群节点的唯一mac地址,你不回滚驱动节点上的时间)。
好处是它总是递增的,相关的插入存储在一起,而且存储和检索都很快。

相关问题