问题陈述
我们使用tail
和subprocess.Popen
将日志数据摄取到自定义应用程序中。此应用程序预计可在各种 *NIX系统上运行,包括OpenBSD、RedHat Linux和Debian 10 + 11。
我们使用subprocess.Popen
生成一个新的子进程,调用系统安装的tail
二进制文件来从文件中收集日志数据。使用系统安装版本的tail
被认为是最有效的,因为它使用inotify
和轮询来发现对文件的更改。
子进程通过STDOUT
推送输出,由DefaultSelector
创建的IO多路复用器处理与我们的python应用程序的通信。根据文档,建议在异构 *NIX系统上运行时使用DefaultSelector
,并选择正确的选择器对象。IO Selector Doc
通过测试,我们注意到read1
调用返回的IO可能包含虚线。例如,文件中包含29834
的行可以作为两条消息返回:2983
和4
。
问题
好奇这是python中选择器模块内的竞争条件还是系统上的竞争条件。减慢对日志文件的写入可以减少这种异常,但不能完全消除它。我创建了一个POC用于测试和进一步解释。我们需要一个解决方案,在应用程序或系统级别解决此问题。也可以接受的是,这是在这个庄园跟踪文件的错误方法。
示例及设置说明
设置
可用于重现问题的两个脚本。首先,tailp.py
将跟踪文件并将输出打印到父进程STDOUT
。第二个脚本write_logs.py
将简单地将30K日志行写入目标文件./log.log
。
1.在本地目录touch log.log
中创建目标日志文件。
1.在终端中运行python3 tailp.py
。
1.在单独的终端运行python3 write_logs.py
。
1.在tailp.py
的终端会话中,应该发出日志行。可能会有一些破碎的数字,例如:
29829
29830
29831
29832
29833
2983 <-- Should be a single line
4 <--
29835
29836
29837
29838
29839
29840
29841
脚本
tailp.py
import selectors
import logging
import subprocess
from shlex import quote
from shutil import which
from threading import Event
def intake_gateway_push(datum):
"""
Function used to push data into a mocked component.
"""
print(datum)
def tail(filename: str, stop_event: Event):
"""
Tails a file and pushes data into intake gateway.
:param filename: full path to file
:param stop_event: event used to send stop signal
"""
with subprocess.Popen(
[which("tail"), "-f", "-n", "0", quote(filename)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
) as tail_process:
logging.info("Creating tail process with PID '%s'", tail_process.pid)
# Using 'DefaultSelector' as this is expected to run
# on Linux and BSD systems
selector = selectors.DefaultSelector()
selector.register(tail_process.stdout, selectors.EVENT_READ)
# Loops when 'selector.select' times out, exits
# when stop event is sent.
while not stop_event.is_set():
for key, _ in selector.select(timeout=5.0):
# Decoding file handler data into utf-8 and separating data
# by newlines.
data = key.fileobj.read1().decode('utf-8').split("\n")
# Removing all empty strings produced by 'split(\n)'.
data = [ datum for datum in data if datum != '' ]
# Pushing each log line into mocked gateway function.
for datum in data:
intake_gateway_push(datum)
logging.info("Unregistering selector from tail PID '%s'", tail_process.pid)
selector.unregister(tail_process.stdout)
# Tail process is usually defunct and does not exit on OpenBSD7.3
# explicity calling 'kill' solves this, need to investigate how
# the context manager is cleaning up child processes.
tail_process.kill()
if __name__ == "__main__":
stop_event = Event()
tail("log.log", stop_event)
write_logs.py
import time
def write_logs(count: int):
"""
Write specified quantity of logs to file.
:param count: number of log lines to write
"""
with open(f"./log.log", mode="w", encoding="utf-8") as file:
for i in range(count):
file.write(f"{i}\n")
time.sleep(0.001)
if __name__ == "__main__":
write_logs(30000)
1条答案
按热度按时间jv4diomz1#
在单次读取操作中未接收到完整行时出现问题。如果接收到的数据的最后一个字符不是换行符,则意味着该行不完整。不完整的行将被存储以供下一次迭代使用,并且下一次读取操作将向其追加更多数据。然而,后续拆分操作仍可能导致处理不完整的行。
示例1.
不完整的行存储在
incomplete_line
变量中。该变量随着每次迭代而更新,以将不完整的线与接收到的新数据沿着累积。然后将组合的数据拆分成行,并处理除最后一行之外的所有完整行。最后一行被存储为可能不完整,用于下一次迭代。示例2.
对于更有效的解决方案,您可以使用相同的过程,但不是收集不完整的行,而是以流式方式处理数据,并在可用时收集完整的行。
注:此解决方案假设数据以
\n
终止,因此请根据您的需要进行调整。