我试图使用线程来卸载三种不同数据类型插入到Postgres数据库中,但在借用检查器上卡住了。
我尝试了几种不同的方法,最后我不知道自己在做什么。所以我试着简化一切,删除了dbm结构的所有属性,并将其设置为每个方法都引用PgPool作为参数,并为每个方法调用创建初始PgPool的克隆。所有方法都有一个&self参数。我还尝试为每个不起作用的操作复制dbm(除了感觉非常笨拙之外)。
我希望这能正常工作,无论是在这里的代码示例中,还是在我使用dbm引用或克隆版本的变体中。相反,我从借用检查器得到以下消息,虽然理解它,但我不知道如何最好地适应它。
76 | let aton_handle = task::spawn(dbm.insert_aton_data(
| _______________________________________^
77 | | connection_pool.clone(),
78 | | split_messages.aton_data,
79 | | &log_id,
80 | | ));
| | ^
| | |
| |_________borrowed value does not live long enough
| argument requires that `dbm` is borrowed for `'static`
...
显示违规代码的代码段:
let connection_pool = PgPool::connect(&connection_string)
.await
.expect("Failed to connect to Postgres");
let dbm = DbMethods {};
// Make API calls etc..
if let Some(messages) = last_hour.ais_response.ais_latest_responses {
// TODO: Handle errors.
let split_messages = process_ais_items(messages).unwrap();
// TODO: Create thread for each message type and do DB inserts.
let aton_handle = task::spawn(dbm.insert_aton_data(
connection_pool.clone(),
split_messages.aton_data,
&log_id,
));
// ... other handles.
let _ = tokio::try_join!(aton_handle, static_handle, position_handle);
}
方法:
pub async fn insert_aton_data(
&self,
db_pool: PgPool,
aton_data: Vec<AISAtonData>,
log_id: &Uuid,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// let pool = PgPool::connect(&self.connection_string).await?;
let tx = db_pool.begin().await?;
for data in aton_data {
sqlx::query!(
"INSERT INTO ais.ais_aton_data (
type_field, message_type, mmsi, msgtime, dimension_a, dimension_b, dimension_c, dimension_d,
type_of_aids_to_navigation, latitude, longitude, name, type_of_electronic_fixing_device, log_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
data.type_field, data.message_type, data.mmsi, convert_to_datetime_option(data.msgtime), data.dimension_a, data.dimension_b,
data.dimension_c, data.dimension_d, data.type_of_aids_to_navigation, data.latitude,
data.longitude, data.name, data.type_of_electronic_fixing_device, log_id
).execute(&db_pool).await?;
}
tx.commit().await?;
Ok(())
}
当前解决方案
我决定放弃将数据库操作作为结构上的方法的模式,而是将它们作为函数,以便继续前进。线程现在可以工作了。希望有人能给我解释一下,我是如何做到把它们作为方法的。
这是我现在结束的代码,以防其他人想做这样的事情。不是说这是一个好主意,它可能是值得看看使用sqlx和Postgres UNNESTING批量插入。
async fn insert_ais_items(connection_pool: PgPool, log_id: Uuid, last_hour: LastHourAISMessage) -> Result<(), Box<dyn Error>> {
if let Some(messages) = last_hour.ais_response.ais_latest_responses {
// TODO: Handle errors.
let split_messages = process_ais_items(messages).unwrap();
let aton_handle = task::spawn(insert_aton_data(
connection_pool.clone(),
split_messages.aton_data,
log_id.clone(),
));
let static_handle = task::spawn(insert_static_data(
connection_pool.clone(),
split_messages.static_data,
log_id.clone(),
));
let position_handle = task::spawn(insert_position_data(
connection_pool.clone(),
split_messages.position_data,
log_id.clone(),
));
let res = tokio::try_join!(aton_handle, static_handle, position_handle);
match res {
Ok(..) => {
debug!("Threads completed");
}
Err(error) => warn!("There was an error in one of the threads: {:?}", error)
}
}
Ok(())
}
1条答案
按热度按时间jqjz2hbq1#
如果只是方法语法是你所追求的,那么你可以通过让编译器做constant promotion来绕过编译器错误,只是在你声明
dbm
的地方添加一个借用:或者为
Dbm
实现Copy
,并将接收器从&self
更改为self
。但我真的建议不要使用方法,因为它的缘故,你可以把你的函数分组到一个模块中,并调用它们与模块的名称为一个小的变化,从
.
到::
:注意:我还将
&Uuid
更改为Uuid
,因为这可能会导致相同的问题另一个注意事项:如果将
Dbm
声明为struct Dbm;
,则在创建{}
的示例时不必添加{}
。