在Rust中将CSV转换为Apache Arrow

wljmcqd8  于 2023-06-30  发布在  Apache
关注(0)|答案(1)|浏览(118)

我需要转换一个csv文件到apache箭头。
下面是我的csv文件的结构(比expert多得多的行):

Date,Value,High,Low,Entry
1209920400,1413.50,1413.50,1412.75,1413.00
1209920580,1413.25,1414.00,1413.25,1413.75
1209921240,1413.75,1414.00,1413.25,1413.50
1209921300,1413.25,1413.25,1413.00,1413.00
1209921600,1413.25,1413.25,1412.75,1412.75
1209921780,1413.00,1413.00,1413.00,1413.00
1209921900,1413.00,1413.00,1412.75,1412.75
1209921960,1412.50,1412.50,1412.50,1412.50
1209922800,1412.75,1412.75,1412.75,1412.75
1209923100,1412.75,1413.50,1412.75,1413.25
1209923400,1412.75,1412.75,1412.50,1412.50
1209926940,1413.75,1414.00,1413.50,1413.50
1209930420,1413.75,1414.25,1413.75,1414.00

到目前为止,我生成了这段代码来推断模式并创建箭头文件:

use arrow::{
    error::ArrowError,
    csv::ReaderBuilder,
    ipc::writer::FileWriter
};
use std::sync::Arc;
use std::{fs::File};

fn main() -> Result<(), ArrowError> {

    let input = "my_data.csv";
    let output = "my_data.arrow";
    let delimiter: u8 = b',';
    let max_read_records: Option<usize> = Some(100);
    let has_header = true;

    let schema = arrow_csv::reader::infer_schema_from_files(&[input.to_string()], delimiter, max_read_records, has_header).unwrap();

    println!("{:?}", schema);

    let file = File::open(input).unwrap();
    let csv_reader = ReaderBuilder::new(Arc::new(schema)).build(file).unwrap();

    let mut writer = FileWriter::try_new(File::create(output)?, csv_reader.schema().as_ref())?;

    for batch in csv_reader {
        match batch {
            Ok(batch) => writer.write(&batch)?,
            Err(error) => return Err(error),
        }
    }

    let _ = writer.finish();

    Ok(())
}

代码编译后产生2个输出。
1-将架构打印到控制台:

Schema {
  fields:[
    Field { name: "Date", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
    Field { name: "Value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
    Field { name: "High", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
    Field { name: "Low", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
    Field { name: "Entry", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
  ],
  metadata: {} 
}

2-将错误打印到控制台:

Error: ParseError("Error while parsing value Date for column 0 at line 0")

首先,我觉得推断出的模式是正确的。但是我没有得到错误。为什么它可以推断出一个正确的Schema,但不能立即解析一些值?
不管我怎么尝试,我都无法摆脱错误,也无法真正了解哪里出了问题。我试图将我的CSV文件减少到更少和/或更简单的模式,问题仍然没有改变。

hzbexzde

hzbexzde1#

这是因为默认情况下ReaderBuilder只需要csv数据(例如没有标题行)。
但是,您可以使用.has_header(true)手动指定给定的csv数据确实具有头。
下面是完整的代码:

use arrow::{
    error::ArrowError,
    csv::ReaderBuilder,
    ipc::writer::FileWriter
};
use std::sync::Arc;
use std::{fs::File};

fn main() -> Result<(), ArrowError> {

    let input = "my_data.csv";
    let output = "my_data.arrow";
    let delimiter: u8 = b',';
    let max_read_records: Option<usize> = Some(100);
    let has_header = true;

    let schema = arrow_csv::reader::infer_schema_from_files(&[input.to_string()], delimiter, max_read_records, has_header).unwrap();

    println!("{:?}", schema);

    let file = File::open(input).unwrap();
    let csv_reader = ReaderBuilder::new(Arc::new(schema)).has_header(true).build(file).unwrap();

    let mut writer = FileWriter::try_new(File::create(output)?, csv_reader.schema().as_ref())?;

    for batch in csv_reader {
        match batch {
            Ok(batch) => writer.write(&batch)?,
            Err(error) => return Err(error),
        }
    }

    let _ = writer.finish();

    Ok(())
}

相关问题