两个消费者如何使用actix字段流?
How to use actix field stream by two consumers?
我有一个 actix 网络服务,想在使用 async-gcode 进行流式传输时解析多部分字段的内容,此外还存储内容,例如在数据库中。
但是,我不知道如何将流输入 Parser 并同时将字节收集到 Vec<u8>
或 String
.
我面临的第一个问题是 field
是 actix::web::Bytes
而不是 u8
的流。
#[post("/upload")]
pub async fn upload_job(
mut payload: Multipart,
) -> Result<HttpResponse, Error> {
let mut contents : Vec<u8> = Vec::new();
while let Ok(Some(mut field)) = payload.try_next().await {
let content_disp = field.content_disposition().unwrap();
match content_disp.get_name().unwrap() {
"file" => {
while let Some(chunk) = field.next().await {
contents.append(&mut chunk.unwrap().to_vec());
// already parse the contents
// and additionally store contents somewhere
}
}
_ => (),
}
}
Ok(HttpResponse::Ok().finish())
}
非常感谢任何提示或建议。
其中一个选项是将字段包装在结构中并为其实现 Stream 特性。
use actix_web::{HttpRequest, HttpResponse, Error};
use futures_util::stream::Stream;
use std::pin::Pin;
use actix_multipart::{Multipart, Field};
use futures::stream::{self, StreamExt};
use futures_util::TryStreamExt;
use std::task::{Context, Poll};
use async_gcode::{Parser, Error as PError};
use bytes::BytesMut;
use std::cell::RefCell;
pub struct Wrapper {
field: Field,
buffer: RefCell<BytesMut>,
index: usize,
}
impl Wrapper {
pub fn new(field: Field, buffer: RefCell<BytesMut>) -> Self {
buffer.borrow_mut().truncate(0);
Wrapper {
field,
buffer,
index: 0
}
}
}
impl Stream for Wrapper {
type Item = Result<u8, PError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<u8, PError>>> {
if self.index == self.buffer.borrow().len() {
match Pin::new(&mut self.field).poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => self.buffer.get_mut().extend_from_slice(&chunk),
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Err(_))) => return Poll::Ready(Some(Err(PError::BadNumberFormat/* ??? */)))
};
} else {
let b = self.buffer.borrow()[self.index];
self.index += 1;
return Poll::Ready(Some(Ok(b)));
}
Poll::Ready(None)
}
}
#[post("/upload")]
pub async fn upload_job(
mut payload: Multipart,
) -> Result<HttpResponse, Error> {
while let Ok(Some(field)) = payload.try_next().await {
let content_disp = field.content_disposition().unwrap();
match content_disp.get_name().unwrap() {
"file" => {
let mut contents: RefCell<BytesMut> = RefCell::new(BytesMut::new());
let mut w = Wrapper::new(field, contents.clone());
let mut p = Parser::new(w);
while let Some(res) = p.next().await {
// Do something with results
};
// Do something with the buffer
let a = contents.get_mut()[0];
}
_ => (),
}
}
Ok(HttpResponse::Ok().finish())
}
不需要从 Field
复制 Bytes
Bytes::try_unsplit将实施。 (https://github.com/tokio-rs/bytes/issues/287)
dmitryvm 的回答(感谢您的努力)告诉我实际上有两个问题。首先,将 Bytes
展平为 u8
,其次,将流“拆分”到缓冲区中供以后存储和 async-gcode
解析器。
这显示了我是如何解决它的:
#[post("/upload")]
pub async fn upload_job(
mut payload: Multipart,
) -> Result<HttpResponse, Error> {
let mut contents : Vec<u8> = Vec::new();
while let Ok(Some(mut field)) = payload.try_next().await {
let content_disp = field.content_disposition().unwrap();
match content_disp.get_name().unwrap() {
"file" => {
let field_stream = field
.map_err(|_| async_gcode::Error::BadNumberFormat) // Translate error
.map_ok(|y| { // Translate Bytes into stream with Vec<u8>
contents.extend_from_slice(&y); // Copy and store for later usage
stream::iter(y).map(Result::<_, async_gcode::Error>::Ok)
})
.try_flatten(); // Flatten the streams of u8's
let mut parser = Parser::new(field_stream);
while let Some(gcode) = parser.next().await {
// Process result from parser
}
}
_ => (),
}
}
Ok(HttpResponse::Ok().finish())
}
我有一个 actix 网络服务,想在使用 async-gcode 进行流式传输时解析多部分字段的内容,此外还存储内容,例如在数据库中。
但是,我不知道如何将流输入 Parser 并同时将字节收集到 Vec<u8>
或 String
.
我面临的第一个问题是 field
是 actix::web::Bytes
而不是 u8
的流。
#[post("/upload")]
pub async fn upload_job(
mut payload: Multipart,
) -> Result<HttpResponse, Error> {
let mut contents : Vec<u8> = Vec::new();
while let Ok(Some(mut field)) = payload.try_next().await {
let content_disp = field.content_disposition().unwrap();
match content_disp.get_name().unwrap() {
"file" => {
while let Some(chunk) = field.next().await {
contents.append(&mut chunk.unwrap().to_vec());
// already parse the contents
// and additionally store contents somewhere
}
}
_ => (),
}
}
Ok(HttpResponse::Ok().finish())
}
非常感谢任何提示或建议。
其中一个选项是将字段包装在结构中并为其实现 Stream 特性。
use actix_web::{HttpRequest, HttpResponse, Error};
use futures_util::stream::Stream;
use std::pin::Pin;
use actix_multipart::{Multipart, Field};
use futures::stream::{self, StreamExt};
use futures_util::TryStreamExt;
use std::task::{Context, Poll};
use async_gcode::{Parser, Error as PError};
use bytes::BytesMut;
use std::cell::RefCell;
pub struct Wrapper {
field: Field,
buffer: RefCell<BytesMut>,
index: usize,
}
impl Wrapper {
pub fn new(field: Field, buffer: RefCell<BytesMut>) -> Self {
buffer.borrow_mut().truncate(0);
Wrapper {
field,
buffer,
index: 0
}
}
}
impl Stream for Wrapper {
type Item = Result<u8, PError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<u8, PError>>> {
if self.index == self.buffer.borrow().len() {
match Pin::new(&mut self.field).poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => self.buffer.get_mut().extend_from_slice(&chunk),
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(Err(_))) => return Poll::Ready(Some(Err(PError::BadNumberFormat/* ??? */)))
};
} else {
let b = self.buffer.borrow()[self.index];
self.index += 1;
return Poll::Ready(Some(Ok(b)));
}
Poll::Ready(None)
}
}
#[post("/upload")]
pub async fn upload_job(
mut payload: Multipart,
) -> Result<HttpResponse, Error> {
while let Ok(Some(field)) = payload.try_next().await {
let content_disp = field.content_disposition().unwrap();
match content_disp.get_name().unwrap() {
"file" => {
let mut contents: RefCell<BytesMut> = RefCell::new(BytesMut::new());
let mut w = Wrapper::new(field, contents.clone());
let mut p = Parser::new(w);
while let Some(res) = p.next().await {
// Do something with results
};
// Do something with the buffer
let a = contents.get_mut()[0];
}
_ => (),
}
}
Ok(HttpResponse::Ok().finish())
}
不需要从 Field
复制 Bytes
Bytes::try_unsplit将实施。 (https://github.com/tokio-rs/bytes/issues/287)
dmitryvm 的回答(感谢您的努力)告诉我实际上有两个问题。首先,将 Bytes
展平为 u8
,其次,将流“拆分”到缓冲区中供以后存储和 async-gcode
解析器。
这显示了我是如何解决它的:
#[post("/upload")]
pub async fn upload_job(
mut payload: Multipart,
) -> Result<HttpResponse, Error> {
let mut contents : Vec<u8> = Vec::new();
while let Ok(Some(mut field)) = payload.try_next().await {
let content_disp = field.content_disposition().unwrap();
match content_disp.get_name().unwrap() {
"file" => {
let field_stream = field
.map_err(|_| async_gcode::Error::BadNumberFormat) // Translate error
.map_ok(|y| { // Translate Bytes into stream with Vec<u8>
contents.extend_from_slice(&y); // Copy and store for later usage
stream::iter(y).map(Result::<_, async_gcode::Error>::Ok)
})
.try_flatten(); // Flatten the streams of u8's
let mut parser = Parser::new(field_stream);
while let Some(gcode) = parser.next().await {
// Process result from parser
}
}
_ => (),
}
}
Ok(HttpResponse::Ok().finish())
}