Rust WASM 中函数 return 上的线程创建

Thread creation on function return in Rust WASM

我在 wasm 环境中使用 Polars。

我注意到 LazyFrame.collect 操作不一致,它有时会在处理某些数据集时创建线程。

这是与问题相关的代码

#[wasm_bindgen]
pub fn start(buff: &[u8],
    item_id:&str, 
    order_id:&str,
    item_name:&str) -> JsValue{

    let cursor = Cursor::new(buff);
    let lf = CsvReader::new(cursor).with_ignore_parser_errors(true).finish().unwrap().lazy();


    let df = lf.groupby([col(order_id)]);
    let df = df.agg([col(item_id),col(item_name)]);
    
    // Error occurs here
    let df = df.collect().unwrap();

} 

使用特定数据集时出现错误:

panicked at 'failed to spawn thread: Error { kind: Unsupported, message: "operation not supported on this platform" }'

因为它试图在 WASM 环境中生成线程。

但是,对于其他数据集,此过程将完美执行。而且它不会尝试创建线程。由于使用各种数据集进行测试,问题似乎不是文件大小。

我想知道 Lazyframe.collect 操作的哪一部分造成了这种不一致以及如何避免它。

working.csv

Order ID,Product ID,Product Name
InvoiceNo0,Product ID0,Product Name0
InvoiceNo0,Product ID1,Product Name1
InvoiceNo0,Product ID2,Product Name2
InvoiceNo0,Product ID3,Product Name3
InvoiceNo0,Product ID4,Product Name4
InvoiceNo0,Product ID5,Product Name5

不是working.csv

Order ID,Product ID,Product Name
B0000001,P0001,Product - 0001
B0000001,P0002,Product - 0002
B0000001,P0003,Product - 0003
B0000001,P0004,Product - 0004
B0000001,P0005,Product - 0005
B0000002,P0006,Product - 0006

允许wasm的Polars fork由 https://github.com/universalmind303/polars/tree/wasm

您可以在此处查看完整项目以及两个 CSV 文件: https://github.com/KivalM/lazyframe-min-test

编辑:describe_plan()

的输出

工作数据集

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
    project */3 columns |   details: None;
    selection: "None"

数据集无效

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
    project */3 columns |   details: None;
    selection: "None"

模式的输出()

工作数据集

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

数据集无效

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

输出describe_optimized_plan():

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Product ID", "Product Name", "Order ID"];
    project 3/3 columns |   details: Some([col("Product ID"), col("Product Name"), col("Order ID")]);
    selection: "None"

编辑: 仔细查看源代码后。问题似乎并不直接来自任何极地代码。 我已将问题追踪到 polars-lazy/src/physical_plan/executors/groupby.rs Function

impl Executor for GroupByExec {
    fn execute

然后returns一个值来自

groupby_helper(df,keys,&self.aggs,self.apply.as_ref(),state,self.maintain_order,self.slice,)

但是,groupby_helper 函数运行完成,数据框创建成功。当数据帧从 groupby_helper 返回到 fn execute 时出现错误。奇怪的是,仅当此函数 returns 时才尝试创建线程。 RUST WASM 中是否存在可能导致此类行为的东西?

看起来我在创建分支时错过了 groupbys 的 std::thread 操作。

impl Drop for GroupsIdx {
    fn drop(&mut self) {
        let v = std::mem::take(&mut self.all);
        // ~65k took approximately 1ms on local machine, so from that point we drop on other thread
        // to stop query from being blocked
        if v.len() > 1 << 16 {
            std::thread::spawn(move || drop(v));
        } else {
            drop(v);
        }
    }
}

数据集大小决定线程生成。

任何大于 1 << 16 (~65k) 的 group 都会产生一个线程。

impl 标记为仅在 non-wasm 目标上编译的功能应该可以解决您的问题。