postgresql 使用Sqlx和Rust在卸除时卸除数据库

r3i60tvu  于 2022-11-23  发布在  PostgreSQL
关注(0)|答案(1)|浏览(136)

到目前为止,我已经完成了第3章的最后部分,在这部分中,我们设置了一些集成测试,并 * 在每次测试运行 * 中创建了一个数据库。
我想扩展这个功能,并添加在每次测试运行后删除数据库的功能。这将我引向Drop特征,在该特征中,我将执行DROP DATABASE命令并结束它(我将存储数据库名称及其连接/池,以便以后访问它)。
但问题是,应用程序在执行DROP DATABASE查询时挂起,并在60秒后超时。我不知道是什么原因导致的,因为我无法打印或调试连接。
下面是我的代码:

use futures::executor;
use sqlx::{Connection, Executor, PgConnection, PgPool};
use std::net::TcpListener;
use uuid::Uuid;
use zero2prod::configuration::{get_configuration, DatabaseSettings};

const BASE_URL: &str = "127.0.0.1";
pub struct TestApp {
    db_name: String,
    connection_string: String,
    pub address: String,
    pub db_pool: PgPool,
    pub connection: PgConnection,
}

/**
 * We need to refactor our project into a library and a binary: all our logic will live in the library crate
while the binary itself will be just an entrypoint with a very slim main function
 */

pub async fn init(url: &str) -> TestApp {
    let mut app = spawn_app().await;
    app.address = format!("{}{}", app.address, url);
    return app;
}

// Launch our application in the background ~somehow~
async fn spawn_app() -> TestApp {
    // We take the BASE_URL const and assign it a port 0. We then
    // pass the listener to the server
    let base_url = format!("{}:0", BASE_URL);
    let listener = TcpListener::bind(base_url).expect("Failed to bind random port");

    // We retrieve the port assigned by the OS
    let port = listener.local_addr().unwrap().port();

    let (connection, db_connection, db_name, connection_string) = init_db().await;

    // We pass the port now to our server
    let server = zero2prod::run(listener, db_connection.clone()).expect("Failed to bind address");
    let _ = actix_web::rt::spawn(server);
    let address = format!("http://{}:{}", BASE_URL, port);

    TestApp {
        db_name: String::from(db_name),
        address,
        db_pool: db_connection,
        connection,
        connection_string,
    }
}

async fn init_db() -> (PgConnection, PgPool, String, String) {
    let mut configuration = get_configuration().expect("Failed to read configuration");

    // We change the db name in each run as we need to run the test multiple times
    configuration.database.database_name = Uuid::new_v4().to_string();

    let (connection, pool) = configure_database(&configuration.database).await;

    return (
        connection,
        pool,
        String::from(&configuration.database.database_name),
        configuration.database.connection_string_without_db(),
    );
}

async fn configure_database(config: &DatabaseSettings) -> (PgConnection, PgPool) {

   // The following returns:
   //   format!(
   //       "postgres://{}:{}@{}:{}",
   //       self.username, self.password, self.host, self.port
   //   )
    let mut connection = PgConnection::connect(&config.connection_string_without_db())
        .await
        .expect("Failed to connect to Postgres.");

    connection
        .execute(format!(r#"CREATE DATABASE "{}""#, config.database_name).as_str())
        .await
        .expect("Failed to create the db");

    // Migrate the database

    let connection_pool = PgPool::connect(&config.connection_string())
        .await
        .expect("Failed to connect to Postgres");

    sqlx::migrate!("./migrations")
        .run(&connection_pool)
        .await
        .expect("Failed to migrate db");

    return (connection, connection_pool);

入口点是init()函数,它基本上返回一个TestApp结构(最初包含db_pooladdress字段)。
问题出在下面我试过的方法如下:
1.使用Smol的运行时在drop中运行异步-尝试初始化到Postgres数据库的新连接

impl Drop for TestApp {
    fn drop(&mut self) {
        smol::block_on(async {
            let mut connection = PgConnection::connect(&self.connection_string)
                .await
                .expect("Failed to connect to Postgres.");

            let result = connection
                .execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str())
                .await
                .expect("Error while querying the drop database");
            println!("{:?}", result);
        });
    }
}

1.使用Smol的运行时在drop中运行异步-已尝试使用现有db_pool

fn drop(&mut self) {
        smol::block_on(async {
            let result = self
                .db_pool
                .execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str())
                .await.expect("Error while querying");
            println!("{:?}", result);
        });
    }

1.使用Future的crate执行器-使用现有的db_pool

let result = executor::block_on(
            self.db_pool
                .execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str()),
        )
        .expect("Failed to drop database");

        println!("{:?}", result);

1.使用Future的crate执行器-运行db_pool.acquire(),然后运行池(这在db_pool.acquire处挂起。

executor::block_on(self.db_pool.acquire()).expect("Failed to acquire pool");
     let result = executor::block_on(
            self.db_pool
                .execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str()),
        )
        .expect("Failed to drop database");

        println!("{:?}", result);

1.使用Future的crate执行器-运行现有连接。

let result = executor::block_on(
            self.connection
                .execute(format!(r#"DROP DATABASE "{}""#, self.db_name).as_str()),
        )
        .expect("Failed to drop database");

        println!("{:?}", result);

请注意,代码并不是最漂亮的,因为我试图首先找到一个有效的解决方案。
不幸的是,我不知道问题是什么,因为没有抛出错误。
有什么想法吗?

20jt8wwn

20jt8wwn1#

我遇到了同样的问题。多亏了这里的一些答案,我最终得到了下面的解决方案(注意,我为以“test_db-"开头的数据库名称添加了一个Assert,因此您必须在前面添加数据库名称或删除该Assert):

pub struct TestApp {
    pub address: String,
    pub db_pool: PgPool,
    pub db_name: String,
    pub db_connection_string: String,
}

impl TestApp {
    async fn terminate(&mut self) {
        assert!(self.db_name.starts_with("test_db-"));
        println!("Cleaning up database: {}", self.db_name);
        self.db_pool.close().await;
        let mut connection = PgConnection::connect(&self.db_connection_string)
            .await
            .expect("Failed to connect to Postgres");

        // Force drop all active connections to database
        // TODO: see if there is a softer way to handle this (i.e. close connection when DB access is complete)
        connection
            .execute(
                format!(
                    r#"
                    SELECT pg_terminate_backend(pg_stat_activity.pid)
                    FROM pg_stat_activity
                    WHERE pg_stat_activity.datname = '{}'
                    AND pid <> pg_backend_pid()
                    "#,
                    self.db_name
                )
                .as_str(),
            )
            .await
            .expect("Failed to terminate current connections to test db");

        connection
            .execute(format!(r#"DROP DATABASE "{}";"#, self.db_name).as_str())
            .await
            .expect("Failed to drop database.");
        println!("Database cleaned up successfully.")
    }
}

impl Drop for TestApp {
    fn drop(&mut self) {
        std::thread::scope(|s| {
            s.spawn(|| {
                let runtime = tokio::runtime::Builder::new_multi_thread()
                    .enable_all()
                    .build()
                    .unwrap();
                runtime.block_on(self.terminate());
            });
        });
    }
}

相关问题