带数据融合的铁 rust -尝试将 Dataframe 写入Json

ztyzrc3y  于 2023-01-26  发布在  其他
关注(0)|答案(1)|浏览(103)

开始编程与生 rust 2天前,并一直试图解决这个问题,因为~3小时到尝试生 rust ...
任何帮助都将不胜感激。
我的目标是编写一个从Datafusion到JSON的Dataframe(它最终将用于在API中使用JSON字符串响应HTTP请求)。
当您收集数据时,DataFrame会变成一个"datafusion::arrow::record_batch::RecordBatch",而这种数据类型是我在转换时遇到的麻烦。
我试过-
1.使用来自Arrow的json::writer::record_batches_to_json_rows,但由于"struct datafusion::arrow::record_batch::RecordBatch和struct arrow::record_batch::RecordBatch具有相似的名称,但实际上是不同的类型",因此不允许使用。无法成功转换类型以避免此问题。
1.我试着在记录批处理过程中将标题和值分别提取到vec中。我能够提取标题,但没有成功提取值。

let mut header = Vec::new();
    // let mut rows = Vec::new();

    for record_batch in data_vec {
        // get data
        println!("record_batch.columns: : {:?}", record_batch.columns());
        for col in record_batch.columns() {
            for row in 0..col.len() {
                // println!("Cow: {:?}", col);
                // println!("Row: {:?}", row);
                // let value = col.as_any().downcast_ref::<StringArray>().unwrap().value(row);
                // rows.push(value);
            }
        }
        // get headers
        for field in record_batch.schema().fields() {
            header.push(field.name().to_string());
        }
    };

有人知道怎么做吗?
全文如下:

// datafusion examples: https://github.com/apache/arrow-datafusion/tree/master/datafusion-examples/examples
// datafusion docs: https://arrow.apache.org/datafusion/
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{Schema};

use arrow::json;

// use serde::{ Deserialize };
use serde_json::to_string;

use std::sync::Arc;
use std::str;
use std::fs;
use std::ops::Deref;

type DFResult = Result<Arc<DataFrame>, datafusion::error::DataFusionError>;

struct FinalObject {
    schema: Schema,
    // columns: Vec<Column>,
    num_rows: usize,
    num_columns: usize,
}

// to allow debug logging for FinalObject
impl std::fmt::Debug for FinalObject {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // write!(f, "FinalObject {{ schema: {:?}, columns: {:?}, num_rows: {:?}, num_columns: {:?} }}",
        write!(f, "FinalObject {{ schema: {:?}, num_rows: {:?}, num_columns: {:?} }}",
                // self.schema,  self.columns, self.num_columns, self.num_rows)
                self.schema, self.num_columns, self.num_rows)
    }
}

fn create_or_delete_csv_file(path: String, content: Option<String>, operation: &str) {
    match operation {
        "create" => {
            match content {
                Some(c) => fs::write(path, c.as_bytes()).expect("Problem with writing file!"),
                None => println!("The content is None, no file will be created"),
            }
        }
        "delete" => {
            // Delete the csv file
            fs::remove_file(path).expect("Problem with deleting file!");
        }
        _ => println!("Invalid operation"),
    }
}

async fn read_csv_file_with_inferred_schema(file_name_string: String) -> DFResult {
    // create string csv data
    let csv_data_string = "heading,value\nbasic,1\ncsv,2\nhere,3".to_string();

    // Create a temporary file
    create_or_delete_csv_file(file_name_string.clone(), Some(csv_data_string), "create");

    // Create a session context
    let ctx = SessionContext::new();

    // Register a lazy DataFrame using the context
    let df = ctx.read_csv(file_name_string.clone(), CsvReadOptions::default()).await.expect("An error occurred while reading the CSV string");

    // return the dataframe
    Ok(Arc::new(df))
}

#[tokio::main]
async fn main() {

    let file_name_string = "temp_file.csv".to_string();

    let arc_csv_df = read_csv_file_with_inferred_schema(file_name_string.clone()).await.expect("An error occurred while reading the CSV string (funct: read_csv_file_with_inferred_schema)");

    // have to use ".clone()" each time I want to use this ref
    let deref_df = arc_csv_df.deref();

    // print to console
    deref_df.clone().show().await.expect("An error occurred while showing the CSV DataFrame");

    // collect to vec
    let record_batches = deref_df.clone().collect().await.expect("An error occurred while collecting the CSV DataFrame");
    // println!("Data: {:?}", data);
    
    // record_batches == <Vec<RecordBatch>>. Convert to RecordBatch
    let record_batch = record_batches[0].clone();

    // let json_string = to_string(&record_batch).unwrap();

    // let mut writer = datafusion::json::writer::RecordBatchJsonWriter::new(vec![]);
    // writer.write(&record_batch).unwrap();
    // let json_rows = writer.finish();
    
    let json_rows = json::writer::record_batches_to_json_rows(&[record_batch]);

    println!("JSON: {:?}", json_rows);

    // get final values from recordbatch
    // https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
    // https://users.rust-lang.org/t/how-to-use-recordbatch-in-arrow-when-using-datafusion/70057/2
    // https://github.com/apache/arrow-rs/blob/6.5.0/arrow/src/util/pretty.rs
    // let record_batches_vec = record_batches.to_vec();

    let mut header = Vec::new();
    // let mut rows = Vec::new();

    for record_batch in data_vec {
        // get data
        println!("record_batch.columns: : {:?}", record_batch.columns());
        for col in record_batch.columns() {
            for row in 0..col.len() {
                // println!("Cow: {:?}", col);
                // println!("Row: {:?}", row);
                // let value = col.as_any().downcast_ref::<StringArray>().unwrap().value(row);
                // rows.push(value);
            }
        }
        // get headers
        for field in record_batch.schema().fields() {
            header.push(field.name().to_string());
        }
    };

    // println!("Header: {:?}", header);
    
    // Delete temp csv
    create_or_delete_csv_file(file_name_string.clone(), None, "delete");
}
1zmg4dgp

1zmg4dgp1#

我不确定Datafusion是否是将CSV字符串转换为JSON字符串的完美地方,但以下是您的代码的工作版本:

#[tokio::main]
async fn main() {
    let file_name_string = "temp_file.csv".to_string();
    let csv_data_string = "heading,value\nbasic,1\ncsv,2\nhere,3".to_string();
    // Create a temporary file
    create_or_delete_csv_file(file_name_string.clone(), Some(csv_data_string), "create");
    // Create a session context
    let ctx = SessionContext::new();
    // Register the csv file
    ctx.register_csv("t1", &file_name_string, CsvReadOptions::new().has_header(false))
        .await.unwrap();
    let df = ctx.sql("SELECT * FROM t1").await.unwrap();
    // collect to vec
    let record_batches = df.collect().await.unwrap();
        // get json rows
    let json_rows = datafusion::arrow::json::writer::record_batches_to_json_rows(&record_batches[..]).unwrap();
    println!("JSON: {:?}", json_rows);
    // Delete temp csv
    create_or_delete_csv_file(file_name_string.clone(), None, "delete");
}

如果遇到arrowdatafusion结构冲突,请使用datafusion::arrow而不仅仅是arrow库。

相关问题