如何使用启用 Futures 的超块的 Serde 零拷贝反序列化来存储结果?
How do I store a result using Serde Zero-copy deserialization of a Futures-enabled Hyper Chunk?
我正在使用 futures、tokio、hyper 和 serde_json 来请求和反序列化一些我需要保留直到下一个请求的数据。我最初的想法是制作一个包含 hyper::Chunk
和从 Chunk
借用的反序列化数据的结构,但无法获得正确的生命周期。我尝试使用 rental crate,但我也无法使用它。也许我在声明缓冲区 Vec
之前使用了 'buffer
生命周期,但也许我搞砸了其他事情:
#[rental]
pub struct ChunkJson<T: serde::de::Deserialize<'buffer>> {
buffer: Vec<u8>,
json: T
}
有什么方法可以使生命周期正确,还是我应该只使用 DeserializeOwned
并放弃零拷贝?
对于更多上下文,以下代码有效(定期从两个 URL 反序列化 JSON,保留结果以便我们可以对它们进行一些处理)。我想更改我的 X
和 Y
类型以将 Cow<'a, str>
用于它们的字段,从 DeserializeOwned
更改为 Deserialize<'a>
。为此,我需要为每个存储已反序列化的切片,但我不知道该怎么做。我正在寻找使用 Serde 的零拷贝反序列化并保留结果的示例,或者一些重构我的代码的想法。
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;
extern crate futures;
extern crate tokio_core;
extern crate tokio_periodic;
extern crate hyper;
use std::collections::HashMap;
use std::error::Error;
use futures::future;
use futures::Future;
use futures::stream::Stream;
use hyper::Client;
fn stream_json<'a, T: serde::de::DeserializeOwned + Send + 'a>
(handle: &tokio_core::reactor::Handle,
url: String,
period: u64)
-> Box<Stream<Item = T, Error = Box<Error>> + 'a> {
let client = Client::new(handle);
let timer = tokio_periodic::PeriodicTimer::new(handle).unwrap();
timer
.reset(::std::time::Duration::new(period, 0))
.unwrap();
Box::new(futures::Stream::zip(timer.from_err::<Box<Error>>(), futures::stream::unfold( (), move |_| {
let uri = url.parse::<hyper::Uri>().unwrap();
let get = client.get(uri).from_err::<Box<Error>>().and_then(|res| {
res.body().concat().from_err::<Box<Error>>().and_then(|chunks| {
let p: Result<T, Box<Error>> = serde_json::from_slice::<T>(chunks.as_ref()).map_err(|e| Box::new(e) as Box<Error>);
match p {
Ok(json) => future::ok((json, ())),
Err(err) => future::err(err)
}
})
});
Some(get)
})).map(|x| { x.1 }))
}
#[derive(Serialize, Deserialize, Debug)]
pub struct X {
foo: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Y {
bar: String,
}
fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let x_stream = stream_json::<HashMap<String, X>>(&handle, "http://localhost/X".to_string(), 2);
let y_stream = stream_json::<HashMap<String, Y>>(&handle, "http://localhost/Y".to_string(), 5);
let mut xy_stream = x_stream.merge(y_stream);
let mut last_x = HashMap::new();
let mut last_y = HashMap::new();
loop {
match core.run(futures::Stream::into_future(xy_stream)) {
Ok((Some(item), stream)) => {
match item {
futures::stream::MergedItem::First(x) => last_x = x,
futures::stream::MergedItem::Second(y) => last_y = y,
futures::stream::MergedItem::Both(x, y) => {
last_x = x;
last_y = y;
}
}
println!("\nx = {:?}", &last_x);
println!("y = {:?}", &last_y);
// Do more stuff with &last_x and &last_y
xy_stream = stream;
}
Ok((None, stream)) => xy_stream = stream,
Err(_) => {
panic!("error");
}
}
}
}
当试图解决一个复杂的编程问题时,尽可能多地删除是非常有用的。获取您的代码并删除您可以删除的内容,直到问题消失。稍微调整一下您的代码并继续删除,直到您不能再删除为止。然后,扭转问题,从最小的部分开始构建,然后重新解决错误。执行这两项操作将告诉您问题出在哪里。
首先,让我们确保反序列化正确:
extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;
use std::borrow::Cow;
#[derive(Debug, Deserialize)]
pub struct Example<'a> {
#[serde(borrow)]
name: Cow<'a, str>,
key: bool,
}
impl<'a> Example<'a> {
fn info(&self) {
println!("{:?}", self);
match self.name {
Cow::Borrowed(_) => println!("Is borrowed"),
Cow::Owned(_) => println!("Is owned"),
}
}
}
fn main() {
let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();
let decoded: Example = serde_json::from_slice(&data).expect("Couldn't deserialize");
decoded.info();
}
这里,我忘了添加#[serde(borrow)]
属性,所以很高兴我做了这个测试!
接下来介绍出租箱:
#[macro_use]
extern crate rental;
rental! {
mod holding {
use super::*;
#[rental]
pub struct VecHolder {
data: Vec<u8>,
parsed: Example<'data>,
}
}
}
fn main() {
let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();
let holder = holding::VecHolder::try_new(data, |data| {
serde_json::from_slice(data)
});
let holder = match holder {
Ok(holder) => holder,
Err(_) => panic!("Unable to construct rental"),
};
holder.rent(|example| example.info());
// Make sure we can move the data and it's still valid
let holder2 = { holder };
holder2.rent(|example| example.info());
}
接下来我们尝试创建出租Chunk
:
#[rental]
pub struct ChunkHolder {
data: Chunk,
parsed: Example<'data>,
}
不幸的是,这失败了:
--> src/main.rs:29:1
|
29 | rental! {
| ^
|
= help: message: Field `data` must have an angle-bracketed type parameter or be `String`.
糟糕!选中 the docs for rental,我们可以将 #[target_ty_hack="[u8]"]
添加到 data
字段。这导致:
error[E0277]: the trait bound `hyper::Chunk: rental::__rental_prelude::StableDeref` is not satisfied
--> src/main.rs:29:1
|
29 | rental! {
| ^ the trait `rental::__rental_prelude::StableDeref` is not implemented for `hyper::Chunk`
|
= note: required by `rental::__rental_prelude::static_assert_stable_deref`
这很烦人;由于我们无法为 Chunk
实现该特征,我们只需要装箱 Chunk
,证明它有一个稳定的地址:
#[rental]
pub struct ChunkHolder {
data: Box<Chunk>,
parsed: Example<'data>,
}
我还查看了是否有办法从 Chunk
中恢复 Vec<u8>
,但似乎不存在。那将是另一种分配和间接更少的解决方案。
此时,"all" 剩下的就是将其集成回期货代码中。除了你之外,任何人都需要做很多工作来重新创建它,但我认为这样做不会有任何明显的问题。
我正在使用 futures、tokio、hyper 和 serde_json 来请求和反序列化一些我需要保留直到下一个请求的数据。我最初的想法是制作一个包含 hyper::Chunk
和从 Chunk
借用的反序列化数据的结构,但无法获得正确的生命周期。我尝试使用 rental crate,但我也无法使用它。也许我在声明缓冲区 Vec
之前使用了 'buffer
生命周期,但也许我搞砸了其他事情:
#[rental]
pub struct ChunkJson<T: serde::de::Deserialize<'buffer>> {
buffer: Vec<u8>,
json: T
}
有什么方法可以使生命周期正确,还是我应该只使用 DeserializeOwned
并放弃零拷贝?
对于更多上下文,以下代码有效(定期从两个 URL 反序列化 JSON,保留结果以便我们可以对它们进行一些处理)。我想更改我的 X
和 Y
类型以将 Cow<'a, str>
用于它们的字段,从 DeserializeOwned
更改为 Deserialize<'a>
。为此,我需要为每个存储已反序列化的切片,但我不知道该怎么做。我正在寻找使用 Serde 的零拷贝反序列化并保留结果的示例,或者一些重构我的代码的想法。
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;
extern crate futures;
extern crate tokio_core;
extern crate tokio_periodic;
extern crate hyper;
use std::collections::HashMap;
use std::error::Error;
use futures::future;
use futures::Future;
use futures::stream::Stream;
use hyper::Client;
fn stream_json<'a, T: serde::de::DeserializeOwned + Send + 'a>
(handle: &tokio_core::reactor::Handle,
url: String,
period: u64)
-> Box<Stream<Item = T, Error = Box<Error>> + 'a> {
let client = Client::new(handle);
let timer = tokio_periodic::PeriodicTimer::new(handle).unwrap();
timer
.reset(::std::time::Duration::new(period, 0))
.unwrap();
Box::new(futures::Stream::zip(timer.from_err::<Box<Error>>(), futures::stream::unfold( (), move |_| {
let uri = url.parse::<hyper::Uri>().unwrap();
let get = client.get(uri).from_err::<Box<Error>>().and_then(|res| {
res.body().concat().from_err::<Box<Error>>().and_then(|chunks| {
let p: Result<T, Box<Error>> = serde_json::from_slice::<T>(chunks.as_ref()).map_err(|e| Box::new(e) as Box<Error>);
match p {
Ok(json) => future::ok((json, ())),
Err(err) => future::err(err)
}
})
});
Some(get)
})).map(|x| { x.1 }))
}
#[derive(Serialize, Deserialize, Debug)]
pub struct X {
foo: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Y {
bar: String,
}
fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let x_stream = stream_json::<HashMap<String, X>>(&handle, "http://localhost/X".to_string(), 2);
let y_stream = stream_json::<HashMap<String, Y>>(&handle, "http://localhost/Y".to_string(), 5);
let mut xy_stream = x_stream.merge(y_stream);
let mut last_x = HashMap::new();
let mut last_y = HashMap::new();
loop {
match core.run(futures::Stream::into_future(xy_stream)) {
Ok((Some(item), stream)) => {
match item {
futures::stream::MergedItem::First(x) => last_x = x,
futures::stream::MergedItem::Second(y) => last_y = y,
futures::stream::MergedItem::Both(x, y) => {
last_x = x;
last_y = y;
}
}
println!("\nx = {:?}", &last_x);
println!("y = {:?}", &last_y);
// Do more stuff with &last_x and &last_y
xy_stream = stream;
}
Ok((None, stream)) => xy_stream = stream,
Err(_) => {
panic!("error");
}
}
}
}
当试图解决一个复杂的编程问题时,尽可能多地删除是非常有用的。获取您的代码并删除您可以删除的内容,直到问题消失。稍微调整一下您的代码并继续删除,直到您不能再删除为止。然后,扭转问题,从最小的部分开始构建,然后重新解决错误。执行这两项操作将告诉您问题出在哪里。
首先,让我们确保反序列化正确:
extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;
use std::borrow::Cow;
#[derive(Debug, Deserialize)]
pub struct Example<'a> {
#[serde(borrow)]
name: Cow<'a, str>,
key: bool,
}
impl<'a> Example<'a> {
fn info(&self) {
println!("{:?}", self);
match self.name {
Cow::Borrowed(_) => println!("Is borrowed"),
Cow::Owned(_) => println!("Is owned"),
}
}
}
fn main() {
let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();
let decoded: Example = serde_json::from_slice(&data).expect("Couldn't deserialize");
decoded.info();
}
这里,我忘了添加#[serde(borrow)]
属性,所以很高兴我做了这个测试!
接下来介绍出租箱:
#[macro_use]
extern crate rental;
rental! {
mod holding {
use super::*;
#[rental]
pub struct VecHolder {
data: Vec<u8>,
parsed: Example<'data>,
}
}
}
fn main() {
let data: Vec<_> = br#"{"key": true, "name": "alice"}"#.to_vec();
let holder = holding::VecHolder::try_new(data, |data| {
serde_json::from_slice(data)
});
let holder = match holder {
Ok(holder) => holder,
Err(_) => panic!("Unable to construct rental"),
};
holder.rent(|example| example.info());
// Make sure we can move the data and it's still valid
let holder2 = { holder };
holder2.rent(|example| example.info());
}
接下来我们尝试创建出租Chunk
:
#[rental]
pub struct ChunkHolder {
data: Chunk,
parsed: Example<'data>,
}
不幸的是,这失败了:
--> src/main.rs:29:1
|
29 | rental! {
| ^
|
= help: message: Field `data` must have an angle-bracketed type parameter or be `String`.
糟糕!选中 the docs for rental,我们可以将 #[target_ty_hack="[u8]"]
添加到 data
字段。这导致:
error[E0277]: the trait bound `hyper::Chunk: rental::__rental_prelude::StableDeref` is not satisfied
--> src/main.rs:29:1
|
29 | rental! {
| ^ the trait `rental::__rental_prelude::StableDeref` is not implemented for `hyper::Chunk`
|
= note: required by `rental::__rental_prelude::static_assert_stable_deref`
这很烦人;由于我们无法为 Chunk
实现该特征,我们只需要装箱 Chunk
,证明它有一个稳定的地址:
#[rental]
pub struct ChunkHolder {
data: Box<Chunk>,
parsed: Example<'data>,
}
我还查看了是否有办法从 Chunk
中恢复 Vec<u8>
,但似乎不存在。那将是另一种分配和间接更少的解决方案。
此时,"all" 剩下的就是将其集成回期货代码中。除了你之外,任何人都需要做很多工作来重新创建它,但我认为这样做不会有任何明显的问题。