优化C++多线程服务器应用程序中的日志记录,以提高性能和防崩溃数据完整性

vhmi4jdf  于 2023-05-20  发布在  其他
关注(0)|答案(3)|浏览(122)

我有一个复杂的服务器应用程序(Windows/Linux),其中包含多个线程,我需要偶尔启用详细日志记录来跟踪bug。但是,日志记录过程会大大降低应用程序的速度,因为在我当前的实现中,每个日志条目都会立即刷新到磁盘。
我想找到一种方法来摆脱即时刷新,但我不确定是否有任何其他选项可以保证所有报告的日志条目写入文件,如果应用程序崩溃。
我正在考虑的一个选择是运行一个单独的线程进行日志记录。在这种方法中,其他线程只会在等待将其日志条目插入到线程安全队列中时经历减速。但是,我不确定如果应用程序在主线程中崩溃,是否有可能确保队列中的所有条目都写入磁盘。Linux信号或Windows的SetUnhandledExceptionFilter是否可以以允许这样的线程完成其工作的方式使用?
另一个考虑的意见是通过UDP/TCP环回将日志条目发送到另一个进程(例如,Linux上的syslog)。但是,我不确定这种方法是否能提高速度,或者在应用程序崩溃时是否总是发送套接字缓冲区。此外,实施此解决方案肯定会增加运行应用程序所需的基础架构的复杂性,尤其是在Windows上。

xt0899hw

xt0899hw1#

需求

如果我对您的需求理解正确的话,有一个应用程序具有多个线程,它们记录到某种类型的单个归档文件(例如而且,即使程序意外终止,从已完成的调用到日志的所有信息也必须成功存档。
我相信,要实现捕获achive中所有信息的严格要求,唯一的方法就是强制同步日志记录。如果写入不同步,则总会有一个窗口,在该窗口中可能会丢失一些信息。如果严格执行,这将排除使用专用线程写入存档等选项。

创意

据我所知,最好的(只有?)以可接受的性能实现该目标的技术是使用某种类型的存储器Map方案(如由@N00byEdge提出的)。我将一些现有的代码整合在一起,构建了这个方案的足够的原型来进行一些测量。
另一个选择是尝试像spdlog这样的高性能日志记录器,它非常强大和性能。我用它来记录日志。但是,如果刷新每个日志消息,它仍然比内存Map文件慢两个数量级(100倍)。

性能结果

根据你的帖子,我构建了一个简单的性能测试,测量使用简单地喷出信息的写入器将200 Mb日志信息写入文件所需的时间,我认为这是最坏的情况。
下面的图显示了记录200 Mb信息所用的时间与日志记录线程数的关系。我在Arm64x86上测量了三个不同的原型:

  • MemoryFile,带有用于同步的自旋锁
  • MemoryFile使用互斥锁进行同步
  • spdlog,但数据减少100倍

如果有兴趣的话,我不介意开发一个更通用的基于内存Map的"file"的开源实现,用于像这样有严格数据保证和性能要求的情况。

示例代码(MemoryFile

#include <atomic>
#include <unistd.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <filesystem>
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>

class SpinLock {
public:
    void lock() {
        while (locked_.test_and_set(std::memory_order_acquire))
            std::this_thread::yield();
    }

    void unlock() {
        locked_.clear(std::memory_order_release);
    }

private:
    std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
};

class SpinLockGuard {
public:
    SpinLockGuard(SpinLock& lock)
        : lock_(lock) {
        lock_.lock();
    }

    ~SpinLockGuard() {
        lock_.unlock();
    }

private:
    SpinLock& lock_;
};

class MemoryFile {
public:
    enum Mode { CREATE, OPEN, CREATE_OR_OPEN };

    struct Header {
        size_t bytes_written{}, bytes_mapped{};
    };

    static constexpr size_t DataOffset = sizeof(Header);

    MemoryFile(const std::string& filename, Mode mode) {
        switch (mode) {
        case CREATE:
            create_chunk(filename);
            break;
        case OPEN:
            open_chunk(filename);
            break;
        case CREATE_OR_OPEN:
            if (std::filesystem::exists(filename)) open_chunk(filename);
            else create_chunk(filename);
            break;
        }
    }

    ~MemoryFile() {
        if (base_)
            munmap(base_, *ptr_bytes_mapped_);
        if (fd_ != -1)
            close(fd_);
    }

    void commit_message(std::string_view msg) {
        // SpinLockGuard guard(lock_);
        std::lock_guard guard(mutex_);
        ensure_room_for_message(msg.size());
        auto nbytes = *ptr_bytes_written_;
        std::copy(msg.begin(), msg.end(), base_ + nbytes);
        nbytes += msg.size();
        *ptr_bytes_written_ = nbytes;
    }

private:
    size_t grow_file(size_t new_size) {
        auto rc = lseek(fd_, new_size - 1, SEEK_SET);
        if (rc + 1 != new_size)
            throw std::runtime_error("error: lseek failed trying to grow file");

        rc = write(fd_, "", 1);
        if (rc != 1)
            throw std::runtime_error("error: write to extend file failed");

        return new_size;
    }

    void map_memory(size_t nbytes) {
        base_ = (char*)mmap(nullptr, nbytes, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0);
        if (base_ == nullptr)
            throw std::runtime_error("error: failed to map memory");

        Header *header = (Header*)base_;
        ptr_bytes_written_ = &(header->bytes_written);
        ptr_bytes_mapped_ = &(header->bytes_mapped);
        *ptr_bytes_mapped_ = nbytes;
    }

    void create_chunk(const std::string& filename) {
        namespace fs = std::filesystem;
        if (fs::exists(fs::path(filename)))
            throw std::runtime_error("error: chunk already exists");

        fd_ = open(filename.c_str(), O_CREAT | O_RDWR, 0777);
        if (fd_ == -1)
            throw std::runtime_error("error: failed to create chunk");

        auto nbytes = grow_file(1024 * 1024);
        map_memory(nbytes);
        *ptr_bytes_written_ = sizeof(Header);
    }

    void open_chunk(const std::string& filename) {
        namespace fs = std::filesystem;
        if (not fs::exists(fs::path(filename)))
            throw std::runtime_error("error: chunk does not exist");

        fd_ = open(filename.c_str(), O_RDWR, 0777);
        if (fd_ == -1)
            throw std::runtime_error("error: failed to open chunk");

        auto nbytes = fs::file_size(fs::path(filename));
        map_memory(nbytes);
    }

    void ensure_room_for_message(size_t n) {
        auto nw = *ptr_bytes_written_, nm = *ptr_bytes_mapped_;
        if (nw + n >= nm) {
            munmap(base_, nm);
            base_ = nullptr;

            auto new_size = std::max(2 * nm, nw + n + 1);
            auto nbytes = grow_file(new_size);
            map_memory(nbytes);
        }
    }

    // Pointer to the number of bytes written.
    volatile size_t *ptr_bytes_written_{nullptr};

    // Pointer to the number of bytes in the mapped region.
    volatile size_t *ptr_bytes_mapped_{nullptr};

    // Pointer to the mapped region.
    char *base_{nullptr};

    // File descriptor of the mapped region.
    int fd_{-1};

    // Use simple spin-lock for thread synchronization.
    // SpinLock lock_;

    // Use a mutex for thread synchronization.
    std::mutex mutex_;
};

template<class Clock = std::chrono::high_resolution_clock>
class StopWatch
{
public:
    StopWatch()
        : start_tp_(Clock::now())
        , last_tp_(start_tp_)
    { }

    auto now() const
    {
        std::atomic_thread_fence(std::memory_order_relaxed);
        auto current_tp = Clock::now();
        std::atomic_thread_fence(std::memory_order_relaxed);
        return current_tp;
    }

    auto mark()
    {
        auto current_tp = now();
        auto elapsed = current_tp - last_tp_;
        last_tp_ = current_tp;
        return elapsed;
    }

    template<class Units = typename Clock::duration>
    auto elapsed_duration()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed);
    }

    template<class Units = typename Clock::duration>
    auto elapsed_time()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed).count();
    }

private:
    typename Clock::time_point start_tp_;
    typename Clock::time_point last_tp_;
};

int main(int argc, const char *argv[]) {
    const auto filename = "/tmp/x.out";
    size_t nth = argc >= 2 ? atoi(argv[1]) : 1;
    size_t nmsg = argc >= 3 ? atoi(argv[2]) : 1'000'000;
    size_t msg_per_thread = nmsg / nth;

    std::filesystem::remove(std::filesystem::path(filename));
    MemoryFile log(filename, MemoryFile::CREATE);

    StopWatch timer;
    timer.mark();
    std::vector<std::thread> threads;
    for (auto i = 0; i < nth; ++i)
        threads.emplace_back([&]() {
            for (auto i = 0; i < msg_per_thread; ++i)
                log.commit_message("This is a log message\n");
        });

    for (auto& thread : threads)
        thread.join();

    auto ms = timer.elapsed_duration<std::chrono::milliseconds>().count();
    std::cout << ms << " ms" << std::endl;

    return 0;
}

示例代码(spdlog

#include <iostream>
#include <filesystem>
#include "spdlog/spdlog.h"
#include "spdlog/sinks/basic_file_sink.h"

template<class Clock = std::chrono::high_resolution_clock>
class StopWatch
{
public:
    StopWatch()
        : start_tp_(Clock::now())
        , last_tp_(start_tp_)
    { }

    auto now() const
    {
        std::atomic_thread_fence(std::memory_order_relaxed);
        auto current_tp = Clock::now();
        std::atomic_thread_fence(std::memory_order_relaxed);
        return current_tp;
    }

    auto mark()
    {
        auto current_tp = now();
        auto elapsed = current_tp - last_tp_;
        last_tp_ = current_tp;
        return elapsed;
    }

    template<class Units = typename Clock::duration>
    auto elapsed_duration()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed);
    }

    template<class Units = typename Clock::duration>
    auto elapsed_time()
    {
        auto elapsed = mark();
        return std::chrono::duration_cast<Units>(elapsed).count();
    }

private:
    typename Clock::time_point start_tp_;
    typename Clock::time_point last_tp_;
};

using std::cout, std::endl;

int main(int argc, const char *argv[]) {
    const auto filename = "/tmp/x.out";
    size_t nth = argc >= 2 ? atoi(argv[1]) : 1;
    size_t nmsg = argc >= 3 ? atoi(argv[2]) : 1'000'000;
    size_t msg_per_thread = nmsg / nth;

    std::filesystem::remove(std::filesystem::path(filename));
    auto sink = std::make_shared<spdlog::sinks::basic_file_sink_mt>("/tmp/x.out", true);
    spdlog::logger logger("basic", sink);

    StopWatch timer;
    timer.mark();
    std::vector<std::thread> threads;
    for (auto i = 0; i < nth; ++i)
        threads.emplace_back([&]() {
            for (auto i = 0; i < msg_per_thread; ++i) {
                logger.info("This is a log message\n");
                logger.flush();
            }
        });

    for (auto& thread : threads)
        thread.join();

    auto ms = timer.elapsed_duration<std::chrono::milliseconds>().count();
    std::cout << ms << " ms" << std::endl;

    return 0;
}
vqlkdk9b

vqlkdk9b2#

如果你有一个数据量的上限,我会用下面的方法来做:
创建一个文件并将其Map到内存中,例如在linux上使用MAP_SHARED。如果你的程序死了,修改应该写回文件。所有你的日志将做的输出数据将只是一个单一的memcpy。这将是高性能的,保持您的数据崩溃,但需要您有一个上限的Map(并在Map之前调整大小)的文件。您还可以将该文件用作环形缓冲区,如果您同意覆盖旧的日志条目,则可以从头重新启动。

dw1jzc5e

dw1jzc5e3#

一种方法是构建一个类似于现有记录器的记录器,它具有:

  • 每个消息的严重性,因此有些消息是error,有些是trace,依此类推
  • 可按严重性配置刷新,例如“仅在出错时刷新”,可能与定期刷新组合。

如果您从多个线程进行日志记录,那么无论如何都希望日志记录是线程安全的,因此一个线程安全队列和一个消费者线程(用于出队和写入这些数据)是一个好主意。
另一个避免刷新速度减慢的技巧是将日志文件写入RAM磁盘,就像大多数Linux发行版上的/tmp一样。
结果是一个在特定严重性时刷新的记录器。这个严重性是可以调整的,因此,例如,在生产部署的系统上,您只会在错误时刷新,但在开发环境中,您会在调试消息时刷新(如果这不是太慢的话)。
这很有效,特别是如果您采用记录意外值的错误的策略(例如你可以检查你的前置和后置条件,如果它们失败了就记录一个错误)。

相关问题