数据融合中的索引
Indexing in datafusion
上下文:我正在使用 datafusion 为 csv 文件输入构建数据验证器。
需求:我想在输出报告中添加错误发生的行号。在 pandas 中,我能够添加可用于此目的的行索引。有没有办法在数据融合中实现类似的结果。
打开 CSV 文件后,似乎没有任何简单的方法可以在 datafusion 中执行此操作。但是您可以直接使用 arrow 打开 CSV 文件,生成一个包含索引列的新 RecordBatch,然后使用 MemTable 将其提供给数据融合。这是假设我们只处理一批的例子......
use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow::array::{UInt32Array, Int64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::csv;
use std::fs::File;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]);
let file = File::open("tests/example.csv")?;
let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
let batch = csv.next().unwrap()?;
let length = batch.num_rows() as u32;
let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
let new_schema = Schema::new(vec![
Field::new("idx", DataType::UInt32, true),
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]);
let new_batch = RecordBatch::try_new(Arc::new(new_schema),
vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
let mut ctx = ExecutionContext::new();
// create the dataframe
let df = ctx.read_table(Arc::new(mem_table))?;
let results = df.collect().await?;
print_batches(&results).unwrap();
// do whatever you need to do
// do whatever you need to do
// do whatever you need to do
Ok(())
}
我的example.csv看起来像这样...
a,b
1,2
1,3
4,2
2,6
3,7
输出应该是...
+-----+---+---+
| idx | a | b |
+-----+---+---+
| 0 | 1 | 2 |
| 1 | 1 | 3 |
| 2 | 4 | 2 |
| 3 | 2 | 6 |
| 4 | 3 | 7 |
+-----+---+---+
不过,如果您真的只是想在 python 中寻找具有 pandas 等功能的 crate,我建议您查看 polars。
鉴于 Ian Grahams 的结帐建议 polars,我想我也举了一个例子来说明如何在极地中实现这一点:
use polars::prelude::*;
use std::io::Cursor;
fn main() -> Result<()> {
// use an in memory repr for the csv
let csv = Cursor::new(
"a,b
1,2
1,3
4,2
2,6
3,7
",
);
// parse the csv into a DataFrame
let mut df = CsvReader::new(csv).finish()?;
// create the index column based on the dataframes height
// note that we use the `NoNull` wrapper to create from `T` instead of `Option<T>`
let mut idx: NoNull<UInt32Chunked> = (0..df.height() as u32).collect();
idx.rename("idx");
// add the index column to the DataFrame
df.insert_at_idx(0, idx.into_inner().into_series())?;
// print output
dbg!(df);
Ok(())
}
输出:
+-----+-----+-----+
| idx | a | b |
| --- | --- | --- |
| u32 | i64 | i64 |
+=====+=====+=====+
| 0 | 1 | 2 |
+-----+-----+-----+
| 1 | 1 | 3 |
+-----+-----+-----+
| 2 | 4 | 2 |
+-----+-----+-----+
| 3 | 2 | 6 |
+-----+-----+-----+
| 4 | 3 | 7 |
+-----+-----+-----+
上下文:我正在使用 datafusion 为 csv 文件输入构建数据验证器。
需求:我想在输出报告中添加错误发生的行号。在 pandas 中,我能够添加可用于此目的的行索引。有没有办法在数据融合中实现类似的结果。
打开 CSV 文件后,似乎没有任何简单的方法可以在 datafusion 中执行此操作。但是您可以直接使用 arrow 打开 CSV 文件,生成一个包含索引列的新 RecordBatch,然后使用 MemTable 将其提供给数据融合。这是假设我们只处理一批的例子......
use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow::array::{UInt32Array, Int64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::csv;
use std::fs::File;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]);
let file = File::open("tests/example.csv")?;
let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
let batch = csv.next().unwrap()?;
let length = batch.num_rows() as u32;
let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
let new_schema = Schema::new(vec![
Field::new("idx", DataType::UInt32, true),
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]);
let new_batch = RecordBatch::try_new(Arc::new(new_schema),
vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
let mut ctx = ExecutionContext::new();
// create the dataframe
let df = ctx.read_table(Arc::new(mem_table))?;
let results = df.collect().await?;
print_batches(&results).unwrap();
// do whatever you need to do
// do whatever you need to do
// do whatever you need to do
Ok(())
}
我的example.csv看起来像这样...
a,b
1,2
1,3
4,2
2,6
3,7
输出应该是...
+-----+---+---+
| idx | a | b |
+-----+---+---+
| 0 | 1 | 2 |
| 1 | 1 | 3 |
| 2 | 4 | 2 |
| 3 | 2 | 6 |
| 4 | 3 | 7 |
+-----+---+---+
不过,如果您真的只是想在 python 中寻找具有 pandas 等功能的 crate,我建议您查看 polars。
鉴于 Ian Grahams 的结帐建议 polars,我想我也举了一个例子来说明如何在极地中实现这一点:
use polars::prelude::*;
use std::io::Cursor;
fn main() -> Result<()> {
// use an in memory repr for the csv
let csv = Cursor::new(
"a,b
1,2
1,3
4,2
2,6
3,7
",
);
// parse the csv into a DataFrame
let mut df = CsvReader::new(csv).finish()?;
// create the index column based on the dataframes height
// note that we use the `NoNull` wrapper to create from `T` instead of `Option<T>`
let mut idx: NoNull<UInt32Chunked> = (0..df.height() as u32).collect();
idx.rename("idx");
// add the index column to the DataFrame
df.insert_at_idx(0, idx.into_inner().into_series())?;
// print output
dbg!(df);
Ok(())
}
输出:
+-----+-----+-----+
| idx | a | b |
| --- | --- | --- |
| u32 | i64 | i64 |
+=====+=====+=====+
| 0 | 1 | 2 |
+-----+-----+-----+
| 1 | 1 | 3 |
+-----+-----+-----+
| 2 | 4 | 2 |
+-----+-----+-----+
| 3 | 2 | 6 |
+-----+-----+-----+
| 4 | 3 | 7 |
+-----+-----+-----+