特性 `std::marker::Copy` 没有为 Type 实现

the trait `std::marker::Copy` is not implemented for Type

我试图在 b/w 不同的线程中移动一些数据,但我收到了 ole Copy trait-not-implemented 错误。这是一些代码:

use std::future::Future;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};

/// Start external crates mocked here

#[derive(Clone, PartialEq, Eq)]
pub struct DecodeError {
    inner: Box<Inner>,
}
#[derive(Clone, PartialEq, Eq)]
struct Inner {}
#[derive(Clone)]
pub struct Connection {}

pub trait Message: core::fmt::Debug + Send + Sync {
    fn decode<B>(mut buf: B) -> Result<Self, DecodeError>
    where
        B: bytes::Buf,
        Self: Default,
    {
        // do stuff
        let mut message = Self::default();
        Ok(message)
    }
}

#[derive(Clone, Debug, Default)]
pub struct Request {}
impl Message for Request {}
#[derive(Clone, Debug, Default)]
pub struct Response {}
impl Message for Response {}

pub struct OtherResponse {}
pub enum ReplyError {
    InvalidData,
}
pub struct EventMessage {
    data: Vec<u8>,
}

pub struct Subscription {}

impl Subscription {
    pub async fn next(&self) -> Option<EventMessage> {
        Some(EventMessage { data: vec![] })
    }
}
/// End external crates mocked here

#[derive(Clone)]
pub struct Publisher<T> {
    connection: Connection,
    subject: String,
    resource_type: PhantomData<*const T>,
}

#[derive(Debug)]
pub enum PublishError {
    SerializeError(String),
    PublishError(String),
}

pub type PublishResult<T> = std::result::Result<T, PublishError>;

impl<T: Message> Publisher<T> {
    pub fn new(connection: Connection, subject: String) -> Self {
        let resource_type = PhantomData;

        Publisher {
            connection: connection,
            subject,
            resource_type,
        }
    }
    pub async fn publish(&self, msg: T) -> PublishResult<()>
    where
        T: Message,
    {
        // do stuff to msg
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let node = Node::new("127.0.0.1", "node".into())
        .await
        .expect("connecting to NATS");
    let p: Publisher<Request> = node.get_publisher("TOPIC".into());
    let _submission_replyer: AsynkReplyer<Request, Response> = node
        .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
            let mut req = req.clone().lock().unwrap();
            p.clone().publish(*req);
            Ok(Response {})
        })
        .await;

    Ok(())
}

pub struct Node {
    name: String,
    connection: Connection,
}

pub type ReplyResult<T> = std::result::Result<T, ReplyError>;

impl Node {
    pub async fn new(_nats_url: &str, name: String) -> std::io::Result<Self> {
        env_logger::init();

        let connection = Connection {};
        Ok(Node { name, connection })
    }

    pub fn get_publisher<T>(&self, subject: String) -> Publisher<T>
    where
        T: Message + Default,
    {
        Publisher::new(self.connection.clone(), subject)
    }

    pub async fn get_replyer<Req, Resp, Fut>(
        &self,
        subject: String,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> AsynkReplyer<Req, Resp>
    where
        Req: Message + Default + 'static,
        Resp: Message + Default,
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        AsynkReplyer::new(&self.connection, subject, callback).await
    }
}

pub struct AsynkReplyer<Req, Resp> {
    request_type: PhantomData<Req>,
    response_type: PhantomData<Resp>,
}

impl<Req: Message + Default + 'static, Resp: Message + Default> AsynkReplyer<Req, Resp> {
    pub async fn new<Fut>(
        connection: &Connection,
        subject: String,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> AsynkReplyer<Req, Resp>
    where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        Self::start_subscription_handler(Subscription {}, callback).await;

        AsynkReplyer {
            request_type: PhantomData,
            response_type: PhantomData,
        }
    }

    pub async fn start_subscription_handler<Fut>(
        subscription: Subscription,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        tokio::spawn(async move {
            loop {
                match subscription.next().await {
                    Some(msg) => {
                        Self::handle_request(msg, callback).await;
                    }
                    None => {
                        break;
                    }
                }
            }
        });
    }

    /// Decodes + spins up another task to handle the request
    pub async fn handle_request<Fut>(
        msg: EventMessage,
        callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    ) -> ReplyResult<()>
    where
        Fut: Future<Output = ReplyResult<Resp>> + Send,
    {
        let decoded = Req::decode(msg.data.as_slice()).map_err(|_| ReplyError::InvalidData)?;

        tokio::spawn(async move {
            match callback(Arc::new(Mutex::new(decoded))).await {
                Ok(response) => {
                    // do stuff
                }
                Err(e) => {}
            }
        });
        Ok(())
    }
}

错误:

error[E0277]: the trait bound `Publisher<Request>: std::marker::Copy` is not satisfied in `[closure@src/main.rs:93:40: 97:10]`
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          within `[closure@src/main.rs:93:40: 97:10]`, the trait `std::marker::Copy` is not implemented for `Publisher<Request>`
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

error[E0277]: `*const Request` cannot be sent between threads safely
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          `*const Request` cannot be sent between threads safely
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Send` is not implemented for `*const Request`
   = note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
  --> src/main.rs:53:12
   |
53 | pub struct Publisher<T> {
   |            ^^^^^^^^^
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

error[E0277]: `*const Request` cannot be shared between threads safely
  --> src/main.rs:93:10
   |
93 |           .get_replyer("request".into(), move |req: Arc<Mutex<Request>>| async {
   |  __________^^^^^^^^^^^___________________-
   | |          |
   | |          `*const Request` cannot be shared between threads safely
94 | |             let mut req = req.clone().lock().unwrap();
95 | |             p.clone().publish(*req);
96 | |             Ok(Response {})
97 | |         })
   | |_________- within this `[closure@src/main.rs:93:40: 97:10]`
   |
   = help: within `[closure@src/main.rs:93:40: 97:10]`, the trait `Sync` is not implemented for `*const Request`
   = note: required because it appears within the type `PhantomData<*const Request>`
note: required because it appears within the type `Publisher<Request>`
  --> src/main.rs:53:12
   |
53 | pub struct Publisher<T> {
   |            ^^^^^^^^^
   = note: required because it appears within the type `[closure@src/main.rs:93:40: 97:10]`

我不能(或者我可以)在 Publisher 结构上添加 Copy 属性,但这不会起作用,因为并非所有字段都实现 Copy。尽管如此,我还是注释掉了 Publisher 中不包含 Copy 的字段,并将属性添加到其中只是为了查看,通过这种方法我得到:

the trait `std::marker::Copy` is not implemented for `Request`

Request 是一个使用 prost 库编译的基于 protobuf 的结构。我无法向其中添加 Copy 属性,因为它的某些字段未实现 Copy,例如 StringTimestamp.

我想知道这里的设计是天生不好还是有简单的修复方法。

这些信息真的很难帮到你。完整的错误代码可能会有用。

无论如何,在“impl Node ... get_replyer()”中你会看到回调应该return somehink that's implement Copy

pub async fn get_replyer<Req, Resp, Fut>(
    &self,
    subject: String,
    callback: impl Fn(Arc<Mutex<Req>>) -> Fut + Send + Sync + 'static + Copy,
    //-------------------------------------------------------------------^
) -> AsynkReplyer<Req, Resp>

主要

let _submission_replyer: AsynkReplyer<Resuest, Response> = node
    .get_replyer(
    "request".into(),
        move |req: Arc<Mutex<Request>>| async {
            
            let mut req = req.lock().unwrap();
            p.publish(*req);
            
            Ok(Response {
                header: None,
                response: Some(OtherResponse {
                    request: None,
                    status: 0,
                }),
            })
//-----------^------------------------------- Return a enum std::result::Result
        },
    )
    .await;
    

std::result::Result 实现 Copy 但关于 Response 呢?此时显示错误?

在我看来,您已将 Fn 限制为 Copy,因为您将其传递给多个 tokio::spawn 调用。您发现 Copy 非常严格,但 Clone 不是。您应该改用它,并在处理新请求时简单地调用 .clone()

Self::handle_request(msg, callback.clone()).await;

那么唯一的错误是'*const Request' cannot be sent between threads safely。编译器不会自动为指针实现 SendSync,因为它不知道这是否安全,但是您的 Fn 需要从不同的线程调用。幸运的是,您不必为此担心。无论你的 PhantomData<*const T> 只是为了满足编译器还是强制执行特定的变化,你都可以获得相同的结果:

resource_type: PhantomData<fn() -> *const T>

然后,既然我们已经修复了类型约束错误,编译器现在会产生关于生命周期的错误:

  • req.clone().lock().unwrap() 不起作用,因为 .lock() 的结果与 req.clone() 的值相关联,但会立即删除。修复是 .clone() 是不必要的,可以删除。

  • p.clone().publish(*req) 不起作用,因为取消引用 MutexLockGuard 不能提供拥有的值,只能提供引用。您可以通过添加 .clone() 来解决此问题。相反,如果您认为 Arc 参数是独占的,您可以按照此处的建议获得所有权:

  • 最后一个生命周期错误有点模糊,因为它与返回的 Future 的生命周期相关联到 req 参数。这可以通过使用 async move { } 来解决,但随后 p 被移出闭包到未来,这意味着它不再是 Fn。你想要的是 移动 req 但移动 克隆 p。你可以这样做:

    move |req: Arc<Mutex<Request>>| {
        let p = p.clone();
        async move {
            // ...
        }
    }
    
  • "现在我正在尝试解决与 await-ing p.publish" 相关的错误 - 错误必须做锁现在在 await 点持续存在,但由于互斥保护没有实现 SendFuture 不能是 Send。您可以通过一步锁定和克隆来解决此问题,因此不会保持锁定:

    let req = req.lock().unwrap().clone();
    p.publish(req);
    Ok(Response {})
    

playground 上查看此编译。仍有许多警告需要解决(未使用 Results),但我希望这能让您走上正确的道路。