两个消费者如何使用actix字段流?

How to use actix field stream by two consumers?

我有一个 actix 网络服务,想在使用 async-gcode 进行流式传输时解析多部分字段的内容,此外还存储内容,例如在数据库中。

但是,我不知道如何将流输入 Parser 并同时将字节收集到 Vec<u8>String.

我面临的第一个问题是 fieldactix::web::Bytes 而不是 u8 的流。

#[post("/upload")]
pub async fn upload_job(
    mut payload: Multipart,
) -> Result<HttpResponse, Error> {
    let mut contents : Vec<u8> = Vec::new();
    while let Ok(Some(mut field)) = payload.try_next().await {
        let content_disp = field.content_disposition().unwrap();
        match content_disp.get_name().unwrap() {
            "file" => {
                while let Some(chunk) = field.next().await {
                    contents.append(&mut chunk.unwrap().to_vec());
                    // already parse the contents 
                    // and additionally store contents somewhere
                }
            }
            _ => (),
        }
    }
    Ok(HttpResponse::Ok().finish())
}

非常感谢任何提示或建议。

其中一个选项是将字段包装在结构中并为其实现 Stream 特性。

use actix_web::{HttpRequest, HttpResponse, Error};
use futures_util::stream::Stream;
use std::pin::Pin;
use actix_multipart::{Multipart, Field};
use futures::stream::{self, StreamExt};
use futures_util::TryStreamExt;
use std::task::{Context, Poll};
use async_gcode::{Parser, Error as PError};
use bytes::BytesMut;
use std::cell::RefCell;

pub struct Wrapper {
    field: Field,
    buffer: RefCell<BytesMut>,
    index: usize,
}

impl Wrapper {
    pub fn new(field: Field, buffer: RefCell<BytesMut>) -> Self {
        buffer.borrow_mut().truncate(0);
        Wrapper {
            field,
            buffer,
            index: 0
        }
    }
}

impl Stream for Wrapper {
    type Item = Result<u8, PError>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<u8, PError>>> {
        if self.index == self.buffer.borrow().len() {
            match Pin::new(&mut self.field).poll_next(cx) {
                Poll::Ready(Some(Ok(chunk))) => self.buffer.get_mut().extend_from_slice(&chunk),
                Poll::Pending => return Poll::Pending,
                Poll::Ready(None) => return Poll::Ready(None),
                Poll::Ready(Some(Err(_))) => return Poll::Ready(Some(Err(PError::BadNumberFormat/* ??? */)))
            };
        } else {
            let b = self.buffer.borrow()[self.index];
            self.index += 1;
            return Poll::Ready(Some(Ok(b)));
        }
        Poll::Ready(None)
    }
}

#[post("/upload")]
pub async fn upload_job(
    mut payload: Multipart,
) -> Result<HttpResponse, Error> {
    while let Ok(Some(field)) = payload.try_next().await {
        let content_disp = field.content_disposition().unwrap();
        match content_disp.get_name().unwrap() {
            "file" => {
                let mut contents: RefCell<BytesMut> = RefCell::new(BytesMut::new());
                let mut w = Wrapper::new(field, contents.clone());
                let mut p = Parser::new(w);
                while let Some(res) = p.next().await {
                    // Do something with results
                };
                // Do something with the buffer
                let a = contents.get_mut()[0];
            }
            _ => (),
        }
    }
    Ok(HttpResponse::Ok().finish())
}

不需要从 Field 复制 Bytes Bytes::try_unsplit将实施。 (https://github.com/tokio-rs/bytes/issues/287)

dmitryvm 的回答(感谢您的努力)告诉我实际上有两个问题。首先,将 Bytes 展平为 u8,其次,将流“拆分”到缓冲区中供以后存储和 async-gcode 解析器。

这显示了我是如何解决它的:

#[post("/upload")]
pub async fn upload_job(
    mut payload: Multipart,
) -> Result<HttpResponse, Error> {
    let mut contents : Vec<u8> = Vec::new();
    while let Ok(Some(mut field)) = payload.try_next().await {
        let content_disp = field.content_disposition().unwrap();
        match content_disp.get_name().unwrap() {
            "file" => {
                let field_stream = field
                    .map_err(|_| async_gcode::Error::BadNumberFormat) // Translate error
                    .map_ok(|y| { // Translate Bytes into stream with Vec<u8>
                        contents.extend_from_slice(&y);   // Copy and store for later usage
                        stream::iter(y).map(Result::<_, async_gcode::Error>::Ok) 
                    })
                    .try_flatten(); // Flatten the streams of u8's
                let mut parser = Parser::new(field_stream);
                while let Some(gcode) = parser.next().await {
                    // Process result from parser
                }
            }
            _ => (),
        }
    }
    Ok(HttpResponse::Ok().finish())
}