如何将所有行作为 tiberius 期货的向量?

How to get all rows as a vector with tiberius futures?

我需要使用 tiberius 收集 table 的所有行并输出它们。我的简单代码是:

extern crate futures;
extern crate tokio_core;
extern crate tiberius;

use futures::Future;
use tokio_core::reactor::Core;
use tiberius::SqlConnection;
use tiberius::stmt::ResultStreamExt;

fn main() {
    let mut core = Core::new().unwrap();

    let future = SqlConnection::connect(core.handle(), "server=tcp:127.0.0.1,1433;username=SA;password=qweasdZXC123!!;")
        .and_then(|conn| {
            let mut v: Vec<String> = Vec::new();

            conn.simple_query("SELECT id, name FROM test").for_each_row(|row| {

                let id: i32 = row.get(0);
                let name: &str = row.get(1);

                v.push(format!("{} - {}", id, name));

                Ok(())
            });

            println!("{:?}", v);

            Ok(())
        });

    core.run(future).unwrap();
}

此代码打印一个空向量,但我需要完整的字符串向量。我读过一些关于期货的文章,但作为 Rust 的新手,它们对我来说太复杂了。

在您的原始代码中,您有

conn.simple_query("SELECT id, name FROM users").for_each_row(|row| {
    // ...
}).wait().unwrap();

你说 "It compiles but hangs on request. I think problem in wait() call."。

如果您阅读 Future::wait 的文档,您会看到此警告,强调我的:

Note: This method is not appropriate to call on event loops or similar I/O situations because it will prevent the event loop from making progress (this blocks the thread). This method should only be called when it's guaranteed that the blocking work associated with this future will be completed by another thread.

在您更新的代码中,您有

conn.simple_query("SELECT id, name FROM test").for_each_row(|row| {
    // ...
});

这构造了一个 future 但随后立即将其丢弃,因此外部向量永远不会发生任何事情。正是出于这个原因,期货箱中的所有期货都附有警告:

warning: unused `futures::FutureResult` which must be used: futures do nothing unless polled

我已经提交 an issue 以便图书馆添加这个。


这是一段完全未经测试的代码。我没有 SQL 服务器实例来实际测试它,但它确实可以编译并且具有正确的形状。

extern crate futures;
extern crate futures_state_stream;
extern crate tokio_core;
extern crate tiberius;

use futures::{Future, Stream};
use futures_state_stream::StateStream;
use tiberius::SqlConnection;
use tokio_core::reactor::Core;

fn main() {
    let mut core = Core::new().unwrap();

    let connection_string = "server=tcp:127.0.0.1,1433;username=SA;password=qweasdZXC123!!;";

    let future = SqlConnection::connect(core.handle(), connection_string)
        .and_then(|conn| {

            let query = conn.query("SELECT * FROM test WHERE id > @P1", &[&0i32])
                .into_stream()
                .take(1);

            query.flatten()
                .map(|row| {
                    let id: i32 = row.get(0);
                    let name: &str = row.get(1);

                    format!("{} - {}", id, name)
                })
                .collect()
        });

    let all_rows = core.run(future).unwrap();
.
    println!("{:?}", all_rows);
}

要点:

  1. conn.query可以与多个查询语句一起使用,所以它returns一个的结果集.
  2. conn.query 实际上实现了 StateStream,而不是 futures::Stream。出于示例的目的,我将其转换回 futures::Stream.into_stream()。这是不理想的,因为我们失去了之后恢复 conn 的能力。
  3. 我只使用 .take(1) 的第一个结果集。
  4. 由于我们现在有 StreamStream,我们使用 Stream::flatten 来删除嵌套。
  5. 每行 map 绑定到 String
  6. Streamcollect 编辑成 Vec<String>
  7. 的单个 Future