有没有办法从 python 调用 rust 的异步接口?
Is there a way to call async interface of rust from python?
我将 rust 的 reqwest
的一些函数包装到 req.lib
文件中,并使用 cffi
从 python 成功调用它。然而 reqwest::blocking::Client
迫使我在 python 中使用多线程。我发现 reqwest
可以在 Rust 中以异步模式调用。我想知道有没有办法让 req.lib
异步?即使是半异步对我来说也可以。
例如,当前存根签名为:
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> *mut c_char
我可以这样写吗:
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> u64 // return request unique id
#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool // whether given request is done
#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *mut c_char // fetch response
因此 cffi
调用不再锁定主线程。我可以使用单线程调用多个请求。欢迎任何建议或最佳实践。
异步代码通过特殊的运行时间执行,对于python和rust,这些是不同且不兼容的库。在那里,您不能简单地在语言之间共享未来,它必须 运行 使用与创建它相同的语言。
至于你的例子,这意味着你需要 运行 在 rust 执行器(例如在 tokio 中)中 Client
然后从中得到反馈。作为最简单的方法,您可以创建一个全局的:
use lazy_static::lazy_static;
use tokio::runtime::Runtime;
lazy_static! {
static ref RUNTIME: Runtime = Runtime::new().unwrap();
}
然后在产卵后你需要有一个反馈,所以你可以使用几个带有状态和结果的地图:
use std::collections::HashMap;
use std::sync::RwLock;
use futures::prelude::*;
use tokio::sync::oneshot;
type FutureId = u64;
type UrlResult = reqwest::Result<String>;
type SyncMap<K, V> = RwLock<HashMap<K, V>>;
lazy_static! {
// Map for feedback channels. Once result is computed, it is stored at `RESULTS`
static ref STATUSES: SyncMap<FutureId, oneshot::Receiver<UrlResult>> = SyncMap::default();
// Cache storage for results
static ref RESULTS: SyncMap<FutureId, UrlResult> = SyncMap::default();
}
fn gen_unique_id() -> u64 { .. }
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> FutureId {
let url: &str = /* convert url */;
let (tx, rx) = oneshot::channel();
RUNTIME.spawn(async move {
let body = reqwest::get(url).and_then(|b| b.text()).await;
tx.send(body).unwrap(); // <- this one should be handled somehow
});
let id = gen_unique_id();
STATUSES.write().unwrap().insert(id, rx);
id
}
这里,每urlopen
个请求都会创建oneshot::channel
,这会延迟一个执行结果。所以可以检查是否完成:
#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool {
// first check in cache
if RESULTS.read().unwrap().contains_key(&req_id) {
true
} else {
let mut res = RESULTS.write().unwrap();
let mut statuses = STATUSES.write().unwrap();
// if nothing in cache, check the feedback channel
if let Some(rx) = statuses.get_mut(&req_id) {
let val = match rx.try_recv() {
Ok(val) => val,
Err(_) => {
// handle error somehow here
return true;
}
};
// and cache the result, if available
res.insert(req_id, val);
true
} else {
// Unknown request id
true
}
}
}
那么获取结果就很简单了:
#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *const c_char {
let res = RESULTS.read().unwrap();
res.get(&req_id)
// there `ok()` should probably be handled in some better way
.and_then(|val| val.as_ref().ok())
.map(|val| val.as_ptr())
.unwrap_or(std::ptr::null()) as *const _
}
Playground link.
请记住,上述解决方案有其优点:
- 结果已缓存,可多次获取;
- API 是(希望)线程安全的;
- 读写锁分离,这可能是比互斥锁更快的解决方案;
还有明显的缺点:
RESULTS
无限增长且从未清除;
- 线程安全使事情变得有点复杂,因此可能不需要,
thread_local!
可用于全局而不是锁;
- 缺乏适当的错误处理;
- 使用了 RwLock,它有时可能比其他一些原语表现更差;
STATUSES
在 is_finished
获得写入权限,但最好先获得读取权限;
我将 rust 的 reqwest
的一些函数包装到 req.lib
文件中,并使用 cffi
从 python 成功调用它。然而 reqwest::blocking::Client
迫使我在 python 中使用多线程。我发现 reqwest
可以在 Rust 中以异步模式调用。我想知道有没有办法让 req.lib
异步?即使是半异步对我来说也可以。
例如,当前存根签名为:
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> *mut c_char
我可以这样写吗:
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> u64 // return request unique id
#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool // whether given request is done
#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *mut c_char // fetch response
因此 cffi
调用不再锁定主线程。我可以使用单线程调用多个请求。欢迎任何建议或最佳实践。
异步代码通过特殊的运行时间执行,对于python和rust,这些是不同且不兼容的库。在那里,您不能简单地在语言之间共享未来,它必须 运行 使用与创建它相同的语言。
至于你的例子,这意味着你需要 运行 在 rust 执行器(例如在 tokio 中)中 Client
然后从中得到反馈。作为最简单的方法,您可以创建一个全局的:
use lazy_static::lazy_static;
use tokio::runtime::Runtime;
lazy_static! {
static ref RUNTIME: Runtime = Runtime::new().unwrap();
}
然后在产卵后你需要有一个反馈,所以你可以使用几个带有状态和结果的地图:
use std::collections::HashMap;
use std::sync::RwLock;
use futures::prelude::*;
use tokio::sync::oneshot;
type FutureId = u64;
type UrlResult = reqwest::Result<String>;
type SyncMap<K, V> = RwLock<HashMap<K, V>>;
lazy_static! {
// Map for feedback channels. Once result is computed, it is stored at `RESULTS`
static ref STATUSES: SyncMap<FutureId, oneshot::Receiver<UrlResult>> = SyncMap::default();
// Cache storage for results
static ref RESULTS: SyncMap<FutureId, UrlResult> = SyncMap::default();
}
fn gen_unique_id() -> u64 { .. }
#[no_mangle]
pub extern "C" fn urlopen(url: *const c_char) -> FutureId {
let url: &str = /* convert url */;
let (tx, rx) = oneshot::channel();
RUNTIME.spawn(async move {
let body = reqwest::get(url).and_then(|b| b.text()).await;
tx.send(body).unwrap(); // <- this one should be handled somehow
});
let id = gen_unique_id();
STATUSES.write().unwrap().insert(id, rx);
id
}
这里,每urlopen
个请求都会创建oneshot::channel
,这会延迟一个执行结果。所以可以检查是否完成:
#[no_mangle]
pub extern "C" fn is_finished(req_id: u64) -> bool {
// first check in cache
if RESULTS.read().unwrap().contains_key(&req_id) {
true
} else {
let mut res = RESULTS.write().unwrap();
let mut statuses = STATUSES.write().unwrap();
// if nothing in cache, check the feedback channel
if let Some(rx) = statuses.get_mut(&req_id) {
let val = match rx.try_recv() {
Ok(val) => val,
Err(_) => {
// handle error somehow here
return true;
}
};
// and cache the result, if available
res.insert(req_id, val);
true
} else {
// Unknown request id
true
}
}
}
那么获取结果就很简单了:
#[no_mangle]
pub extern "C" fn fetch_result(req_id: u64) -> *const c_char {
let res = RESULTS.read().unwrap();
res.get(&req_id)
// there `ok()` should probably be handled in some better way
.and_then(|val| val.as_ref().ok())
.map(|val| val.as_ptr())
.unwrap_or(std::ptr::null()) as *const _
}
Playground link.
请记住,上述解决方案有其优点:
- 结果已缓存,可多次获取;
- API 是(希望)线程安全的;
- 读写锁分离,这可能是比互斥锁更快的解决方案;
还有明显的缺点:
RESULTS
无限增长且从未清除;- 线程安全使事情变得有点复杂,因此可能不需要,
thread_local!
可用于全局而不是锁; - 缺乏适当的错误处理;
- 使用了 RwLock,它有时可能比其他一些原语表现更差;
STATUSES
在is_finished
获得写入权限,但最好先获得读取权限;