如何多线程同步访问SQLite查询

gcxthw6b  于 2023-04-30  发布在  SQLite
关注(0)|答案(1)|浏览(334)

我在主进程中有一个股票列表,我创建了4个线程,试图将它的每一行传递给多个线程。
根据结果,似乎线程没有同步,并且存在重复连接。
但是我已经使用了互斥锁,并且使用了工厂模式来避免重复的连接名。
我不知道发生了什么事。
多谢了!


main.cpp

QThreadPool threadPool;
threadPool.setMaxThreadCount(4);

QMutex mutex;

while (stockListQuery.next()) {
    QString name_ = stockListQuery.value("name").toString();

    mutex.lock();
    testRunable* query = new testRunable(db, name_);
    threadPool.start(query);
    mutex.unlock();
}
threadPool.waitForDone();

.h

class testRunable : public QRunnable {
public:
    testRunable(QSqlDatabase db, const QString &name);
    void run() override;

private:
    QSqlDatabase db_;
    QMutex m_mutex;
    QString name_;
};

.cc

testRunable::testRunable(QSqlDatabase db, const QString &name): db_(db), name_(name)
{
}

void testRunable::run() {

    QString ids = QString::number((int)QThread::currentThread());
    QSqlDatabase db = DBFactory(ids).getDatabase();
    int iterations = 0;

    while (1) {
        m_mutex.lock();
        qDebug() << name_ << " " << db.connectionName();
        m_mutex.unlock();

        if (iterations >= 1) {
            break; // stop the loop and exit the thread
        }

        iterations++;

        QThread::sleep(1); // seconds
    }

}

更新感谢@user4581301,我修改了下面的互斥锁用法,解决了我的一个问题。哦,上帝。

您可以看到控制台上没有重复的库存。但另一个问题是连接名重复。
我想这和我的工厂模式有关。我把所有的连接都保存在一个Map中,以避免重复连接,我还在弄清楚。
谢谢你们所有人

main.cpp

QThreadPool threadPool;
    threadPool.setMaxThreadCount(4);

    QMutex mutex;

    // Submit the query task to the thread pool
    while (stockListQuery.next()) {
        QString name_ = stockListQuery.value("name").toString();

        testRunable* query = new testRunable(db, name_, mutex); <--- ref of mutex should be passed in the function
        threadPool.start(query);
    }
    threadPool.waitForDone();

对@Mooing Duck来说,你把多个东西放进同一个名字的stockListQuery是什么意思?

.h

class DBFactory {
public:
    QSqlDatabase getDatabase();
    void setConnetName(const QString& connectName);
private:
    QString conName_;
    QMap<QString, QSqlDatabase> m_;
};

class testRunable : public QRunnable {
public:
    testRunable(QSqlDatabase db, const QString &name, QMutex& mutex);
    void run() override;

private:
    DBFactory dbFac_;
    QSqlDatabase db_;
    QMutex& m_mutex;
    QString name_;
};

.cpp

QSqlDatabase DBFactory::getDatabase()
{
    if (m_.contains(conName_)) {
        return m_[conName_];
    }

    // Create a new database connection object
    QSqlDatabase db = QSqlDatabase::addDatabase("QSQLITE", conName_);
    db.setDatabaseName("STOCKLIST.db");

    if (!db.isOpen()) {
        db.open();
    }

    QSqlQuery query(db);
    query.exec("PRAGMA journal_mode=WAL;");
    if (query.next()) {
        QString mode = query.value(0).toString();
        //qDebug() << "Current SQLite mode is:" << mode;
    }

    // save to map
    m_[conName_] = db;
    

    return db;
}

void DBFactory::setConnetName(const QString& connectName)
{
    conName_ = connectName;
}

testRunable::testRunable(QSqlDatabase db, const QString &name, QMutex& mutex): db_(db), name_(name), m_mutex(mutex) {
}

void testRunable::run() {

    
    QString ids = QString::number((int)QThread::currentThread());
    dbFac_.setConnetName(ids);
    QSqlDatabase db = dbFac_.getDatabase();

    m_mutex.lock();
    qDebug() << name_ << " " << db.connectionName();
    stockCrawler(name_);
    saveToTable("result_table", db);
    m_mutex.unlock();

}
t3psigkw

t3psigkw1#

正确的方法是像使用任何其他数据库一样使用SQLite数据库,包括涉及多个线程时。让我解释一下。
在[Using SQLite in multi-threaded applications]中,我们可以看到SQLite可以成为线程安全的:

  • 第1节:概述:
  • 在序列化模式下,SQLite可以安全地由多个线程使用,没有任何限制。*

在序列化模式下,查询是逐个执行的。

  • 第二节:

线程模式的编译时选择:* 使用SQLITE_THREADSAFE编译时参数选择线程模式。如果不存在SQLITE_THREADSAFE编译时参数,则使用序列化模式。*

Qt附带的qsqlite驱动程序使用默认模式编译,即:即序列化。

这对你来说意味着你不必在乎。差不多了
以下示例演示:

  • IMHO打开从多个线程到同一数据库的连接的正确方法是什么:

假设你从主线程打开了一个连接,当一个辅助线程启动时使用QSqlDatabase::clone(重要的是:工作线程内),连接名中包含线程id。
如果检索起来更容易,可以将clone的结果存储到一个thread_local QSqlDatabase database;变量中,该变量在一个.cpp文件中声明。

  • 连接选项QSQLITE_BUSY_TIMEOUT的用法。

这是唯一可能导致查询失败的原因,因为您使用的是多线程环境。

  • 关于查询在执行之前如何等待的规则与std::shared_mutex/QReadWriteLock提供的规则相似(如果不是完全相同的话)(我没有检查代码以查看它是否实际使用了shared_mutex):读查询可以同时执行。

QMutex会让线程相互等待,即使它们不必等待,也会对性能造成太大的损害。
在下面的示例中:

  • 如果缩短QSQLITE_BUSY_TIMEOUTt2将失败(在t1完成之前超时)。
  • 如果注解/删除query.finish();(就在实际测试开始之前),t1将失败。

说明:调用finish()(或销毁QSqlQuery)释放互斥锁;失败+因为query永远不会超出作用域,互斥锁由主线程保存,并且超过t1
请注意,如果t2也可能失败,也可能不失败,这取决于t1何时“让路”。

  • 如果你保留上面的query.finish();注解+你删除了t1内部的lambda体,那么t2将执行,尽管主线程持有互斥量。
#include <QtCore/QCoreApplication>
#include <QtCore/QDir>
#include <QtCore/QThread>
#include <QtCore/QVariant>

#include <QtSql/QSqlDatabase>
#include <QtSql/QSqlError>
#include <QtSql/QSqlField>
#include <QtSql/QSqlQuery>
#include <QtSql/QSqlRecord>

#include <chrono>
#include <thread>

thread_local QSqlDatabase database;

int main(int argc, char** arga)
{
    QCoreApplication a(argc, arga);
    QDir::setCurrent(QCoreApplication::applicationDirPath());

    QSqlDatabase mainDBConnection = QSqlDatabase::addDatabase("QSQLITE", "MainConnection");
    mainDBConnection.setDatabaseName("TestDatabase");
    //We want a long enough timeout for the test.
    mainDBConnection.setConnectOptions("QSQLITE_BUSY_TIMEOUT=30000");
    if (mainDBConnection.open()) {

        QSqlQuery query(mainDBConnection);
        query.exec("CREATE TABLE Test (ID INTEGER)");

        // Proceed with the test if above query is OK
        // or if the table was created during a previous test.
        if (auto error = query.lastError();
            error.type() == QSqlError::NoError || (
                error.type() == QSqlError::ErrorType::StatementError &&
                error.text() == "table Test already exists Unable to execute statement"
                )
            ) {
            query.exec("DELETE FROM Test");
            query.finish(); //If you remove this line, the insertion done in t1 will not work.

            { //Start of the actual test.
                std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
                // t1 executes a long-running INSERT query.
                std::thread t1([begin]() {
                    QString dbName = QString("t%1").arg(reinterpret_cast<intptr_t>(QThread::currentThreadId()));
                    database = QSqlDatabase::cloneDatabase("MainConnection", dbName);
                    database.open();
                    QSqlQuery query(database);
                    query.exec(
                        "WITH RECURSIVE r(i) AS (" \
                        "VALUES(0) " \
                        "UNION ALL " \
                        "SELECT 1+i FROM r " \
                        "LIMIT 10000000 " \
                        ") " \
                        "INSERT INTO Test SELECT i FROM r"
                    );
                    if (auto error = query.lastError();
                        error.type() != QSqlError::NoError
                        )
                        qDebug() << "Error in t1: " << error.text();
                    query.finish();
                    database.close();
                    std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
                    qDebug() << "t1 finished at: " <<
                        std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count() << "ms";
                    });

                // t2 executes a SELECT query (after t1 has started).
                std::thread t2([begin]() {
                    // Wait to ensure t1 has started.
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                    QString dbName = QString("t%1").arg(reinterpret_cast<intptr_t>(QThread::currentThreadId()));
                    database = QSqlDatabase::cloneDatabase("MainConnection", dbName);
                    database.open();
                    QSqlQuery query(database);
                    query.exec("SELECT COUNT(*) FROM Test");
                    if (query.first())
                        qDebug() << "Selected: " << query.record().field(0).value().toInt();
                    else
                        qDebug() << "Error in t2: " << query.lastError().text();
                    query.finish();
                    database.close();
                    std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
                    qDebug() << "t2 finished at: " <<
                        std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count() << "ms";
                    });

                t1.join();
                t2.join();
            } //End of the actual test.
        }
        else
            qDebug() << "Test aborted because of error: " << error.type() << error.text();

        // Back to the main thread, we check the value returned by t2
        // counted all the records after t1 finished inserting.
        query.exec("SELECT COUNT(*) FROM Test");
        if (query.first())
            qDebug() << "Expected result from t2: " << query.record().field(0).value().toInt();
        query.finish();
        mainDBConnection.close();
    }
    return 0;
}

相关问题