如何有效地将数千次RPC调用的结果写入Rust SQLite DB

1tuwyuhd  于 12个月前  发布在  SQLite
关注(0)|答案(1)|浏览(118)

在设置Rust服务器时,我必须进行超过10万次RPC调用。这是服务器重启时的一次性操作。我必须编写这些10万+的结果获取到数据库。我使用rusqlite为我的数据库。并与reqwest进行RPC调用。我希望这是一个RPC操作,因为服务器应该能够通过阅读从数据库处理查询,而初始化过程仍在处理中,可能需要几个小时,因为它正在进行100K RPC调用。
我尝试生成一个时雄task并在其中进行RPC调用:

let handle = tokio::task::spawn(async move {
    let conn = Connection::open("rpc_results.db")?;
    for _ in 0..200000 {
        let res = fetch_rpc_endpoint().await?;
        write_to_db(res, conn)?;
    }
});

字符串
在这里,我在每次RPC调用后都写数据库,这是非常低效的。我可以通过在Vec中保存100个RPC结果并批量写入数据库来优化这一点。
有没有可能进一步优化这个?我在考虑并行RPC调用和并行批量写入数据库,这可能吗?记住数据库的竞争条件。这可能是一个问题,因为我必须打开一个新的连接,同时从部分写入的数据库中读取。
或者任何Rust库组合来实现这一点,因为这似乎是一个标准问题?

kokeuurv

kokeuurv1#

正如评论中所暗示的,这可以通过futures::{Try,}StreamExtbuffer_unorderedtry_chunks相对巧妙地实现:

futures::stream::iter(0..100)
    .map(|_| fetch_rpc_endpoint())
    .buffer_unordered(5) // RPC parallelism
    .try_chunks(20) // DB insert chunk size
    .map(|data| {
        write_to_db(data.map_err(|TryChunksError(_, e)| e)?, &mut conn)?;
        Ok::<_, Infallible>(())
    })
    .try_collect::<()>()
    .await?;

字符串
Playground
备注:

  • 如果你关心顺序,还有buffered而不是buffer_unordered
  • 您可能需要将write_to_db Package 在spawn_blocking中。
  • 外部循环上的tokio::spawn没有实现任何并行/并发。

相关问题