linux Python I/O Multiplexer中的潜在竞争条件

aij0ehis  于 2023-05-16  发布在  Linux
关注(0)|答案(1)|浏览(147)

问题陈述

我们使用tailsubprocess.Popen将日志数据摄取到自定义应用程序中。此应用程序预计可在各种 *NIX系统上运行,包括OpenBSD、RedHat Linux和Debian 10 + 11。
我们使用subprocess.Popen生成一个新的子进程,调用系统安装的tail二进制文件来从文件中收集日志数据。使用系统安装版本的tail被认为是最有效的,因为它使用inotify和轮询来发现对文件的更改。
子进程通过STDOUT推送输出,由DefaultSelector创建的IO多路复用器处理与我们的python应用程序的通信。根据文档,建议在异构 *NIX系统上运行时使用DefaultSelector,并选择正确的选择器对象。IO Selector Doc
通过测试,我们注意到read1调用返回的IO可能包含虚线。例如,文件中包含29834的行可以作为两条消息返回:29834

问题

好奇这是python中选择器模块内的竞争条件还是系统上的竞争条件。减慢对日志文件的写入可以减少这种异常,但不能完全消除它。我创建了一个POC用于测试和进一步解释。我们需要一个解决方案,在应用程序或系统级别解决此问题。也可以接受的是,这是在这个庄园跟踪文件的错误方法。

示例及设置说明

设置

可用于重现问题的两个脚本。首先,tailp.py将跟踪文件并将输出打印到父进程STDOUT。第二个脚本write_logs.py将简单地将30K日志行写入目标文件./log.log
1.在本地目录touch log.log中创建目标日志文件。
1.在终端中运行python3 tailp.py
1.在单独的终端运行python3 write_logs.py
1.在tailp.py的终端会话中,应该发出日志行。可能会有一些破碎的数字,例如:

29829
29830
29831
29832
29833
2983     <-- Should be a single line
4        <--
29835
29836
29837
29838
29839
29840
29841

脚本

tailp.py

import selectors
import logging
import subprocess
from shlex import quote
from shutil import which
from threading import Event

def intake_gateway_push(datum):
    """
    Function used to push data into a mocked component.
    """
    print(datum)

def tail(filename: str, stop_event: Event):
    """
    Tails a file and pushes data into intake gateway.

    :param filename: full path to file
    :param stop_event: event used to send stop signal
    """
    with subprocess.Popen(
        [which("tail"), "-f", "-n", "0", quote(filename)],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    ) as tail_process:
        logging.info("Creating tail process with PID '%s'", tail_process.pid)

        # Using 'DefaultSelector' as this is expected to run
        # on Linux and BSD systems
        selector = selectors.DefaultSelector()
        selector.register(tail_process.stdout, selectors.EVENT_READ)

        # Loops when 'selector.select' times out, exits
        # when stop event is sent.
        while not stop_event.is_set():
            for key, _ in selector.select(timeout=5.0):

                # Decoding file handler data into utf-8 and separating data
                # by newlines.
                data = key.fileobj.read1().decode('utf-8').split("\n")

                # Removing all empty strings produced by 'split(\n)'.
                data = [ datum for datum in data if datum != '' ]

                # Pushing each log line into mocked gateway function.
                for datum in data:
                    intake_gateway_push(datum)
        logging.info("Unregistering selector from tail PID '%s'", tail_process.pid)
        selector.unregister(tail_process.stdout)
        
        # Tail process is usually defunct and does not exit on OpenBSD7.3
        # explicity calling 'kill' solves this, need to investigate how
        # the context manager is cleaning up child processes.
        tail_process.kill()

if __name__ == "__main__":
    stop_event = Event()
    tail("log.log", stop_event)

write_logs.py

import time

def write_logs(count: int):
    """
    Write specified quantity of logs to file.

    :param count: number of log lines to write
    """
    with open(f"./log.log", mode="w", encoding="utf-8") as file:
        for i in range(count):
            file.write(f"{i}\n")
            time.sleep(0.001)

if __name__ == "__main__":
    write_logs(30000)
jv4diomz

jv4diomz1#

在单次读取操作中未接收到完整行时出现问题。如果接收到的数据的最后一个字符不是换行符,则意味着该行不完整。不完整的行将被存储以供下一次迭代使用,并且下一次读取操作将向其追加更多数据。然而,后续拆分操作仍可能导致处理不完整的行。

示例1.

不完整的行存储在incomplete_line变量中。该变量随着每次迭代而更新,以将不完整的线与接收到的新数据沿着累积。然后将组合的数据拆分成行,并处理除最后一行之外的所有完整行。最后一行被存储为可能不完整,用于下一次迭代。

def tail(filename: str, stop_event: Event):
    """
    Tails a file and pushes data into intake gateway.

    :param filename: full path to file
    :param stop_event: event used to send stop signal
    """
    with subprocess.Popen(
        [which("tail"), "-f", "-n", "0", quote(filename)],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    ) as tail_process:
        logging.info("Creating tail process with PID '%s'", tail_process.pid)

        # Using 'DefaultSelector' as this is expected to run
        # on Linux and BSD systems
        selector = selectors.DefaultSelector()
        selector.register(tail_process.stdout, selectors.EVENT_READ)

        incomplete_line = ''
        # Loops when 'selector.select' times out, exits
        # when stop event is sent.
        while not stop_event.is_set():
            for key, _ in selector.select(timeout=5.0):
                # Read data from fileobj
                data = key.fileobj.read1().decode('utf-8')

                # Combine the incomplete line with the new data
                data = incomplete_line + data

                # Split the data into lines
                lines = data.split("\n")

                # Process all complete lines except the last one
                for line in lines[:-1]:
                    # Pushing each log line into mocked gateway function.
                    intake_gateway_push(line)

                # The last line might be incomplete, so store it for the next iteration
                incomplete_line = lines[-1]
                
        logging.info("Unregistering selector from tail PID '%s'", tail_process.pid)
        selector.unregister(tail_process.stdout)
        
        # Tail process is usually defunct and does not exit on OpenBSD7.3
        # explicity calling 'kill' solves this, need to investigate how
        # the context manager is cleaning up child processes.
        tail_process.kill()

示例2.

对于更有效的解决方案,您可以使用相同的过程,但不是收集不完整的行,而是以流式方式处理数据,并在可用时收集完整的行。
注:此解决方案假设数据以\n终止,因此请根据您的需要进行调整。

def tail(filename: str, stop_event: Event):
    """
    Tails a file and pushes data into intake gateway.

    :param filename: full path to file
    :param stop_event: event used to send stop signal
    """
    with subprocess.Popen(
        [which("tail"), "-f", "-n", "0", quote(filename)],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    ) as tail_process:
        logging.info("Creating tail process with PID '%s'", tail_process.pid)

        # Using 'DefaultSelector' as this is expected to run
        # on Linux and BSD systems
        selector = selectors.DefaultSelector()
        selector.register(tail_process.stdout, selectors.EVENT_READ)

        incomplete_line = ''
        # Loops when 'selector.select' times out, exits
        # when stop event is sent.
        while not stop_event.is_set():
            for key, _ in selector.select(timeout=5.0):
                # Read data from fileobj
                data = key.fileobj.read1().decode('utf-8')

                # Combine the incomplete line with the new data
                data = incomplete_line + data
                
                # Process each character in the data
                start = 0
                for i in range(len(data)):
                    if data[i] == '\n':
                        # Process the complete line
                        line = data[start:i]
                        # Pushing the log line into mocked gateway function.
                        intake_gateway_push(line)
                        start = i + 1

                # The last line might be incomplete, so store it for the next iteration
                # incomplete_line = lines[-1]
                incomplete_line = data[start:]
                
        logging.info("Unregistering selector from tail PID '%s'", tail_process.pid)
        selector.unregister(tail_process.stdout)
        
        # Tail process is usually defunct and does not exit on OpenBSD7.3
        # explicity calling 'kill' solves this, need to investigate how
        # the context manager is cleaning up child processes.
        tail_process.kill()

相关问题