如何将 futures_io::AsyncRead 转换为 rusoto::ByteStream?
How do I convert a futures_io::AsyncRead to rusoto::ByteStream?
我正在尝试构建一个从 SFTP 服务器提取文件并将其上传到 S3 的服务。
对于 SFTP 部分,我使用 async-ssh2, which gives me a file handler implementing futures::AsyncRead
. Since these SFTP files may be quite large, I am trying to turn this File
handler into a ByteStream
that I can upload using Rusoto. It looks like a ByteStream
can be initialized with a futures::Stream
。
我的计划是在 File
对象上实现 Stream
(基于代码 here)以与 Rusoto 兼容(为后代复制下面的代码):
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
pub struct ByteStream<R>(R);
impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
type Item = u8;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut buf = [0; 1];
match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
Ok(n) if n != 0 => Some(buf[0]).into(),
_ => None.into(),
}
}
}
这样做是不是一个好方法?我看到了this question, but it seems to be using tokio::io::AsyncRead
。使用 tokio
是执行此操作的规范方法吗?如果是这样,有没有办法将 futures_io::AsyncRead
转换为 tokio::io::AsyncRead
?
这就是我进行转换的方式。我基于上面的代码,只是我使用了更大的缓冲区 (8 KB) 来减少网络调用的次数。
use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
use futures_io::AsyncRead;
use rusoto_s3::StreamingBody;
const KB: usize = 1024;
struct ByteStream<R>(R);
impl<R: AsyncRead + Unpin> Stream for ByteStream<R> {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut buf = vec![0_u8; 8 * KB];
match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf[..])) {
Ok(n) if n != 0 => Some(Ok(Bytes::from(buf))).into(),
Ok(_) => None.into(),
Err(e) => Some(Err(e)).into(),
}
}
}
允许我这样做:
fn to_streamingbody(body: async_ssh2::File) -> Option<StreamingBody> {
let stream = ByteStream(body);
Some(StreamingBody::new(stream))
}
(注意rusoto::StreamingBody
和rusoto::ByteStream
是别名)
我正在尝试构建一个从 SFTP 服务器提取文件并将其上传到 S3 的服务。
对于 SFTP 部分,我使用 async-ssh2, which gives me a file handler implementing futures::AsyncRead
. Since these SFTP files may be quite large, I am trying to turn this File
handler into a ByteStream
that I can upload using Rusoto. It looks like a ByteStream
can be initialized with a futures::Stream
。
我的计划是在 File
对象上实现 Stream
(基于代码 here)以与 Rusoto 兼容(为后代复制下面的代码):
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
pub struct ByteStream<R>(R);
impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
type Item = u8;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut buf = [0; 1];
match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
Ok(n) if n != 0 => Some(buf[0]).into(),
_ => None.into(),
}
}
}
这样做是不是一个好方法?我看到了this question, but it seems to be using tokio::io::AsyncRead
。使用 tokio
是执行此操作的规范方法吗?如果是这样,有没有办法将 futures_io::AsyncRead
转换为 tokio::io::AsyncRead
?
这就是我进行转换的方式。我基于上面的代码,只是我使用了更大的缓冲区 (8 KB) 来减少网络调用的次数。
use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
use futures_io::AsyncRead;
use rusoto_s3::StreamingBody;
const KB: usize = 1024;
struct ByteStream<R>(R);
impl<R: AsyncRead + Unpin> Stream for ByteStream<R> {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut buf = vec![0_u8; 8 * KB];
match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf[..])) {
Ok(n) if n != 0 => Some(Ok(Bytes::from(buf))).into(),
Ok(_) => None.into(),
Err(e) => Some(Err(e)).into(),
}
}
}
允许我这样做:
fn to_streamingbody(body: async_ssh2::File) -> Option<StreamingBody> {
let stream = ByteStream(body);
Some(StreamingBody::new(stream))
}
(注意rusoto::StreamingBody
和rusoto::ByteStream
是别名)