数据融合中的索引

Indexing in datafusion

上下文:我正在使用 datafusion 为 csv 文件输入构建数据验证器。

需求:我想在输出报告中添加错误发生的行号。在 pandas 中,我能够添加可用于此目的的行索引。有没有办法在数据融合中实现类似的结果。

打开 CSV 文件后,似乎没有任何简单的方法可以在 datafusion 中执行此操作。但是您可以直接使用 arrow 打开 CSV 文件,生成一个包含索引列的新 RecordBatch,然后使用 MemTable 将其提供给数据融合。这是假设我们只处理一批的例子......

use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow::array::{UInt32Array, Int64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::csv;

use std::fs::File;
use std::sync::Arc;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {


    let schema = Schema::new(vec![
        Field::new("a", DataType::Int64, false),
        Field::new("b", DataType::Int64, false),
    ]);
    
    let file = File::open("tests/example.csv")?;
    
    let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
    let batch = csv.next().unwrap()?;

    let length = batch.num_rows() as u32;
    let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
    let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
    let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
    let new_schema = Schema::new(vec![
        Field::new("idx", DataType::UInt32, true),
        Field::new("a", DataType::Int64, false),
        Field::new("b", DataType::Int64, false),
    ]);

    let new_batch = RecordBatch::try_new(Arc::new(new_schema),
        vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
    let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
    
    let mut ctx = ExecutionContext::new();

    // create the dataframe
    let df = ctx.read_table(Arc::new(mem_table))?;

    let results = df.collect().await?;

    print_batches(&results).unwrap();

    // do whatever you need to do
    // do whatever you need to do
    // do whatever you need to do
    
    Ok(())
}

我的example.csv看起来像这样...

a,b
1,2
1,3
4,2
2,6
3,7

输出应该是...

+-----+---+---+
| idx | a | b |
+-----+---+---+
| 0   | 1 | 2 |
| 1   | 1 | 3 |
| 2   | 4 | 2 |
| 3   | 2 | 6 |
| 4   | 3 | 7 |
+-----+---+---+

不过,如果您真的只是想在 python 中寻找具有 pandas 等功能的 crate,我建议您查看 polars

鉴于 Ian Grahams 的结帐建议 polars,我想我也举了一个例子来说明如何在极地中实现这一点:

use polars::prelude::*;
use std::io::Cursor;

fn main() -> Result<()> {

    // use an in memory repr for the csv
    let csv = Cursor::new(
        "a,b
1,2
1,3
4,2
2,6
3,7
",
    );

    // parse the csv into a DataFrame
    let mut df = CsvReader::new(csv).finish()?;

    // create the index column based on the dataframes height
    // note that we use the `NoNull` wrapper to create from `T` instead of `Option<T>`
    let mut idx: NoNull<UInt32Chunked> = (0..df.height() as u32).collect();
    idx.rename("idx");

    // add the index column to the DataFrame
    df.insert_at_idx(0, idx.into_inner().into_series())?;


    // print output
    dbg!(df);

    Ok(())
}

输出:

+-----+-----+-----+
| idx | a   | b   |
| --- | --- | --- |
| u32 | i64 | i64 |
+=====+=====+=====+
| 0   | 1   | 2   |
+-----+-----+-----+
| 1   | 1   | 3   |
+-----+-----+-----+
| 2   | 4   | 2   |
+-----+-----+-----+
| 3   | 2   | 6   |
+-----+-----+-----+
| 4   | 3   | 7   |
+-----+-----+-----+