postgresql 在Rust中使用sqlx进行多线程postgres操作的惯用方法是什么?

xwbd5t1u  于 2023-10-18  发布在  PostgreSQL
关注(0)|答案(1)|浏览(146)

我试图使用线程来卸载三种不同数据类型插入到Postgres数据库中,但在借用检查器上卡住了。
我尝试了几种不同的方法,最后我不知道自己在做什么。所以我试着简化一切,删除了dbm结构的所有属性,并将其设置为每个方法都引用PgPool作为参数,并为每个方法调用创建初始PgPool的克隆。所有方法都有一个&self参数。我还尝试为每个不起作用的操作复制dbm(除了感觉非常笨拙之外)。
我希望这能正常工作,无论是在这里的代码示例中,还是在我使用dbm引用或克隆版本的变体中。相反,我从借用检查器得到以下消息,虽然理解它,但我不知道如何最好地适应它。

76 |           let aton_handle = task::spawn(dbm.insert_aton_data(
   |  _______________________________________^
77 | |             connection_pool.clone(),
78 | |             split_messages.aton_data,
79 | |             &log_id,
80 | |         ));
   | |         ^
   | |         |
   | |_________borrowed value does not live long enough
   |           argument requires that `dbm` is borrowed for `'static`
...

显示违规代码的代码段:

let connection_pool = PgPool::connect(&connection_string)
        .await
        .expect("Failed to connect to Postgres");
    let dbm = DbMethods {};

    // Make API calls etc..

    if let Some(messages) = last_hour.ais_response.ais_latest_responses {
        // TODO: Handle errors.
        let split_messages = process_ais_items(messages).unwrap();

        // TODO: Create thread for each message type and do DB inserts.
        let aton_handle = task::spawn(dbm.insert_aton_data(
            connection_pool.clone(),
            split_messages.aton_data,
            &log_id,
        ));
        // ... other handles.

        let _ = tokio::try_join!(aton_handle, static_handle, position_handle);
    }

方法:

pub async fn insert_aton_data(
        &self,
        db_pool: PgPool,
        aton_data: Vec<AISAtonData>,
        log_id: &Uuid,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        // let pool = PgPool::connect(&self.connection_string).await?;
        let tx = db_pool.begin().await?;

        for data in aton_data {
            sqlx::query!(
                "INSERT INTO ais.ais_aton_data (
                    type_field, message_type, mmsi, msgtime, dimension_a, dimension_b, dimension_c, dimension_d,
                    type_of_aids_to_navigation, latitude, longitude, name, type_of_electronic_fixing_device, log_id
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
                data.type_field, data.message_type, data.mmsi, convert_to_datetime_option(data.msgtime), data.dimension_a, data.dimension_b,
                data.dimension_c, data.dimension_d, data.type_of_aids_to_navigation, data.latitude,
                data.longitude, data.name, data.type_of_electronic_fixing_device, log_id
            ).execute(&db_pool).await?;
        }
        tx.commit().await?;

        Ok(())
    }

当前解决方案

我决定放弃将数据库操作作为结构上的方法的模式,而是将它们作为函数,以便继续前进。线程现在可以工作了。希望有人能给我解释一下,我是如何做到把它们作为方法的。
这是我现在结束的代码,以防其他人想做这样的事情。不是说这是一个好主意,它可能是值得看看使用sqlx和Postgres UNNESTING批量插入。

async fn insert_ais_items(connection_pool: PgPool, log_id: Uuid, last_hour: LastHourAISMessage) -> Result<(), Box<dyn Error>> {
    if let Some(messages) = last_hour.ais_response.ais_latest_responses {
        // TODO: Handle errors.
        let split_messages = process_ais_items(messages).unwrap();

        let aton_handle = task::spawn(insert_aton_data(
            connection_pool.clone(),
            split_messages.aton_data,
            log_id.clone(),
        ));
        let static_handle = task::spawn(insert_static_data(
            connection_pool.clone(),
            split_messages.static_data,
            log_id.clone(),
        ));
        let position_handle = task::spawn(insert_position_data(
            connection_pool.clone(),
            split_messages.position_data,
            log_id.clone(),
        ));

        let res = tokio::try_join!(aton_handle, static_handle, position_handle);
        match res {
            Ok(..) => {
                debug!("Threads completed");
            }
            Err(error) => warn!("There was an error in one of the threads: {:?}", error)
        }
    }

    Ok(())
}
jqjz2hbq

jqjz2hbq1#

如果只是方法语法是你所追求的,那么你可以通过让编译器做constant promotion来绕过编译器错误,只是在你声明dbm的地方添加一个借用:

let dbm = &Dbm {};

或者为Dbm实现Copy,并将接收器从&self更改为self
但我真的建议不要使用方法,因为它的缘故,你可以把你的函数分组到一个模块中,并调用它们与模块的名称为一个小的变化,从.::

mod dbm {
    pub async fn insert_aton_data(
        db_pool: PgPool,
        aton_data: Vec<AISAtonData>,
        log_id: Uuid,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 
        //…
    }
}

// and later call it like
let handle = task::spawn(dbm::insert_aton_data(
    connection_pool.clone(),
    split_messages.aton_data,
    log_id.clone(),
));

注意:我还将&Uuid更改为Uuid,因为这可能会导致相同的问题
另一个注意事项:如果将Dbm声明为struct Dbm;,则在创建{}的示例时不必添加{}

相关问题