我有下面的异步代码块,它作为一个更大的程序的一部分运行,当 Dataframe 有一个长度为10或30的行时,它会成功运行,但当我将它设置为一个更大的数字(如300)时,它会尝试将 Dataframe 写为parquet,并为每个异步线程抛出一个运行时错误:
下面是一个它试图写入但失败的df的例子。
df: Ok(shape: (300, 4)
┌───────────────┬─────────┬────────────────────────────────┬───────────────────────────────────────┐
│ timestamp ┆ ticker ┆ bids ┆ asks │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ list[list[f64]] ┆ list[list[f64]] │
╞═══════════════╪═════════╪════════════════════════════════╪═══════════════════════════════════════╡
│ 1674962575119 ┆ ETHUSDT ┆ [[1589.51, 4.731], [1589.31, ┆ [[1590.93, 39.234], [1592.1, 51.... │
│ ┆ ┆ 93.... ┆ │
│ 1674962575220 ┆ ETHUSDT ┆ [[1589.51, 22.094], [1589.31, ┆ [[1590.93, 39.234], [1592.1, 51.... │
│ ┆ ┆ 24... ┆ │
│ 1674962575319 ┆ ETHUSDT ┆ [[1589.51, 12.324], [1589.31, ┆ [[1590.93, 39.309], [1592.1, 52.... │
│ ┆ ┆ 24... ┆ │
│ 1674962575421 ┆ ETHUSDT ┆ [[1589.51, 0.0], [1589.31, ┆ [[1590.93, 26.735], [1592.1, 52.... │
│ ┆ ┆ 24.26... ┆ │
│ ... ┆ ... ┆ ... ┆ ... │
│ 1674962604998 ┆ ETHUSDT ┆ [[1440.0, 5138.446], [1558.38, ┆ [[1617.28, 40.969], [1593.72, 3.... │
│ ┆ ┆ 0... ┆ │
│ 1674962605101 ┆ ETHUSDT ┆ [[1440.0, 5138.446], [1558.38, ┆ [[1617.28, 40.969], [1593.72, 3.... │
│ ┆ ┆ 0... ┆ │
│ 1674962605201 ┆ ETHUSDT ┆ [[1440.0, 5138.446], [1558.38, ┆ [[1617.28, 40.969], [1593.72, 3.... │
│ ┆ ┆ 0... ┆ │
│ 1674962605301 ┆ ETHUSDT ┆ [[1440.0, 5138.446], [1558.38, ┆ [[1617.28, 40.969], [1593.72, 3.... │
│ ┆ ┆ 0... ┆ │
└───────────────┴─────────┴────────────────────────────────┴───────────────────────────────────────┘)
这里是错误。
thread '<unnamed>' panicked at 'range end index 131373 out of range for slice of length 301', C:\Users\username\.cargo\git\checkouts\arrow2-945af624853845da\baa2618\src\io\parquet\write\mod.rs:171:37
thread '<unnamed>' panicked at 'range end index 131373 out of range for slice of length 301', C:\Users\username\.cargo\git\checkouts\arrow2-945af624853845da\baa2618\src\io\parquet\write\mod.rs:171:37
下面是代码。
async {
if main_vec.length() >= ROWS {
let df = main_vec.to_df();
let time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let file = std::fs::File::create(&(time.to_string().trim() + ".parquet")).unwrap();
println!("Wrote parquet file: {}", &(time.to_string().trim().to_owned() + ".parquet"));
// ERROR OCCURS HERE
// ERROR OCCURS HERE
// ERROR OCCURS HERE
ParquetWriter::new(file)
.with_compression(ParquetCompression::Snappy)
.with_statistics(true)
.finish(&mut df.collect().unwrap())
.expect("Failed to write parquet file");
main_vec.clear();
}
}.await;
1条答案
按热度按时间xkrw2x1b1#
这是修补在最新更新的极地