开始编程与生 rust 2天前,并一直试图解决这个问题,因为~3小时到尝试生 rust ...
任何帮助都将不胜感激。
我的目标是编写一个从Datafusion到JSON的Dataframe(它最终将用于在API中使用JSON字符串响应HTTP请求)。
当您收集数据时,DataFrame会变成一个"datafusion::arrow::record_batch::RecordBatch",而这种数据类型是我在转换时遇到的麻烦。
我试过-
1.使用来自Arrow的json::writer::record_batches_to_json_rows,但由于"struct datafusion::arrow::record_batch::RecordBatch
和struct arrow::record_batch::RecordBatch
具有相似的名称,但实际上是不同的类型",因此不允许使用。无法成功转换类型以避免此问题。
1.我试着在记录批处理过程中将标题和值分别提取到vec中。我能够提取标题,但没有成功提取值。
let mut header = Vec::new();
// let mut rows = Vec::new();
for record_batch in data_vec {
// get data
println!("record_batch.columns: : {:?}", record_batch.columns());
for col in record_batch.columns() {
for row in 0..col.len() {
// println!("Cow: {:?}", col);
// println!("Row: {:?}", row);
// let value = col.as_any().downcast_ref::<StringArray>().unwrap().value(row);
// rows.push(value);
}
}
// get headers
for field in record_batch.schema().fields() {
header.push(field.name().to_string());
}
};
有人知道怎么做吗?
全文如下:
// datafusion examples: https://github.com/apache/arrow-datafusion/tree/master/datafusion-examples/examples
// datafusion docs: https://arrow.apache.org/datafusion/
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{Schema};
use arrow::json;
// use serde::{ Deserialize };
use serde_json::to_string;
use std::sync::Arc;
use std::str;
use std::fs;
use std::ops::Deref;
type DFResult = Result<Arc<DataFrame>, datafusion::error::DataFusionError>;
struct FinalObject {
schema: Schema,
// columns: Vec<Column>,
num_rows: usize,
num_columns: usize,
}
// to allow debug logging for FinalObject
impl std::fmt::Debug for FinalObject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// write!(f, "FinalObject {{ schema: {:?}, columns: {:?}, num_rows: {:?}, num_columns: {:?} }}",
write!(f, "FinalObject {{ schema: {:?}, num_rows: {:?}, num_columns: {:?} }}",
// self.schema, self.columns, self.num_columns, self.num_rows)
self.schema, self.num_columns, self.num_rows)
}
}
fn create_or_delete_csv_file(path: String, content: Option<String>, operation: &str) {
match operation {
"create" => {
match content {
Some(c) => fs::write(path, c.as_bytes()).expect("Problem with writing file!"),
None => println!("The content is None, no file will be created"),
}
}
"delete" => {
// Delete the csv file
fs::remove_file(path).expect("Problem with deleting file!");
}
_ => println!("Invalid operation"),
}
}
async fn read_csv_file_with_inferred_schema(file_name_string: String) -> DFResult {
// create string csv data
let csv_data_string = "heading,value\nbasic,1\ncsv,2\nhere,3".to_string();
// Create a temporary file
create_or_delete_csv_file(file_name_string.clone(), Some(csv_data_string), "create");
// Create a session context
let ctx = SessionContext::new();
// Register a lazy DataFrame using the context
let df = ctx.read_csv(file_name_string.clone(), CsvReadOptions::default()).await.expect("An error occurred while reading the CSV string");
// return the dataframe
Ok(Arc::new(df))
}
#[tokio::main]
async fn main() {
let file_name_string = "temp_file.csv".to_string();
let arc_csv_df = read_csv_file_with_inferred_schema(file_name_string.clone()).await.expect("An error occurred while reading the CSV string (funct: read_csv_file_with_inferred_schema)");
// have to use ".clone()" each time I want to use this ref
let deref_df = arc_csv_df.deref();
// print to console
deref_df.clone().show().await.expect("An error occurred while showing the CSV DataFrame");
// collect to vec
let record_batches = deref_df.clone().collect().await.expect("An error occurred while collecting the CSV DataFrame");
// println!("Data: {:?}", data);
// record_batches == <Vec<RecordBatch>>. Convert to RecordBatch
let record_batch = record_batches[0].clone();
// let json_string = to_string(&record_batch).unwrap();
// let mut writer = datafusion::json::writer::RecordBatchJsonWriter::new(vec![]);
// writer.write(&record_batch).unwrap();
// let json_rows = writer.finish();
let json_rows = json::writer::record_batches_to_json_rows(&[record_batch]);
println!("JSON: {:?}", json_rows);
// get final values from recordbatch
// https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
// https://users.rust-lang.org/t/how-to-use-recordbatch-in-arrow-when-using-datafusion/70057/2
// https://github.com/apache/arrow-rs/blob/6.5.0/arrow/src/util/pretty.rs
// let record_batches_vec = record_batches.to_vec();
let mut header = Vec::new();
// let mut rows = Vec::new();
for record_batch in data_vec {
// get data
println!("record_batch.columns: : {:?}", record_batch.columns());
for col in record_batch.columns() {
for row in 0..col.len() {
// println!("Cow: {:?}", col);
// println!("Row: {:?}", row);
// let value = col.as_any().downcast_ref::<StringArray>().unwrap().value(row);
// rows.push(value);
}
}
// get headers
for field in record_batch.schema().fields() {
header.push(field.name().to_string());
}
};
// println!("Header: {:?}", header);
// Delete temp csv
create_or_delete_csv_file(file_name_string.clone(), None, "delete");
}
1条答案
按热度按时间1zmg4dgp1#
我不确定Datafusion是否是将CSV字符串转换为JSON字符串的完美地方,但以下是您的代码的工作版本:
如果遇到
arrow
和datafusion
结构冲突,请使用datafusion::arrow
而不仅仅是arrow
库。