无法将“Arc”中的数据借用为可变数据

Cannot borrow data in an `Arc` as mutable

我不知道接下来要做什么。看来是我理解错了什么,又或者是我没学到什么关键的话题。

use std::sync::Arc;

use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0

pub struct Task {
    pub id: u32,
    pub url: String,
}
pub enum Message {
    Failure(Task, Error),
    Success(Task, Response),
}

struct State {
    client: reqwest::Client,
    res_tx: Sender<Message>,
    res_rx: Receiver<Message>,
}

pub struct Proxy {
    state: Arc<State>,
    max_rps: u16,
    max_pending: u16,
    id: u32,
    parent_tx: Sender<String>,
}

async fn send_msg<T>(tx: &Sender<T>, msg: T) {
    match tx.send(msg).await {
        Err(error) => {
            eprintln!("{}", error)
        }
        _ => (),
    };
}

impl Proxy {
    // Starts loop for input channel
    async fn start_chin(&mut self) -> Sender<Task> {
        let (chin_tx, mut chin_rx) = mpsc::channel::<Task>(self.max_pending as usize + 1 as usize);
        let state_outer = self.state.clone();

        tokio::spawn(async move {
            loop {
                match chin_rx.recv().await {
                    Some(task) => {
                        let res_tx = state_outer.res_tx.clone();
                        let state = state_outer.clone();
                        tokio::spawn(async move {
                            match state.client.get(&task.url).send().await {
                                Ok(res) => send_msg(&res_tx, Message::Success(task, res)).await,
                                Err(err) => send_msg(&res_tx, Message::Failure(task, err)).await,
                            }
                        });
                    }
                    None => (),
                }
            }
        });
        chin_tx
    }

    async fn start_chres(&self) {
        let state = self.state.clone();

        tokio::spawn(async move {
            loop {
                match state.res_rx.recv().await { // LINE PRODUCES ERROR
                    Some(task) => {}
                    None => (),
                }
            }
        });
    }
}

impl Proxy {
    pub fn new(
        id: u32,
        parent_tx: Sender<String>,
        proxy_addr: &str,
        max_rps: u16,
        max_pending: u16,
    ) -> Result<Self, Error> {
        let client = reqwest::Client::builder();
        if proxy_addr != "none" {
            client = client.proxy(reqwest::Proxy::all(proxy_addr)?)
        }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        Ok(Proxy {
            id,
            state: Arc::new(State {
                client: client.build()?,
                res_tx,
                res_rx,
            }),
            max_rps,
            max_pending,
            parent_tx,
        })
    }
}
error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/lib.rs:69:23
   |
69 |                 match state.res_rx.recv().await {
   |                       ^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<State>`
use std::sync::Arc;

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(something);
    arc.increase();
}

给予

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:16:5
   |
16 |     arc.increase();
   |     ^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Something>`

error: aborting due to previous error; 1 warning emitted

因为它试图借用 arc 作为可变的。要做到这一点,必须为 Arc 实施 DerefMut 但这不是因为 Arc 不应该是可变的。

将您的对象包裹在 Mutex 中有效:

use std::sync::{Arc, Mutex};

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(Mutex::new(something));
    arc.lock().unwrap().increase();
}

现在可以共享,可以增加了

Lucas Zanella 的回答和 Shepmaster 的评论对重构和简化代码有很大帮助。我决定在 Proxy::new() 函数内部传递所有权,而不是使用共享引用。代码变得更具可读性,并且我避免了对可变 tokio::sync::mpsc::Receiver 的共享引用。也许这个问题被证明太松散了,但多亏了社区,我找到了一种新方法。重构后的代码如下。

use reqwest::{Client, Error, Response};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};


pub struct Task {
    pub id: u32,
    pub url:  String,
}
pub enum Message{
    Failure(Task, Error),
    Success(Task, Response),
}
pub struct Proxy{
    id: u32,
    max_rps: u16,
    max_pending: u16,
    in_tx: Sender<Task>,
}


async fn send_msg<T>(tx: &Sender<T>, msg: T){
    match tx.send(msg).await {
        Err(error) => { eprintln!("{}", error) },
        _ => (),
    };
}


async fn start_loop_in(client: Client, mut in_rx: Receiver<Task>, res_tx: Sender<Message>){
    loop {
        if let Some(task) = in_rx.recv().await {
            let client_clone = client.clone();
            let res_tx_clone = res_tx.clone();
            tokio::spawn(async move {
                println!("SENDING: {}", &task.url); // TODO: DELETE DEBUG
                match client_clone.get(&task.url).send().await {
                    Ok(res) => send_msg(&res_tx_clone, Message::Success(task, res)).await,
                    Err(err) => send_msg(&res_tx_clone, Message::Failure(task, err)).await,
                }
            });
        }
    }
}


async fn start_loop_res(mut res_rx: Receiver<Message>, out_tx: Sender<String>){
    loop {
        if let Some(message) = res_rx.recv().await {
            match message {
                Message::Success(task, res) => { 
                    send_msg(
                        &out_tx, 
                        format!("{:#?}", res.text().await.unwrap()) // TODO: change in release!
                    ).await;
                },
                Message::Failure(task, err) => {
                    send_msg(&out_tx, err.to_string()).await;
                },
            }
        }
    }
}


impl Proxy{

    pub fn new(id: u32, parent_tx: Sender<String>, proxy_addr: &str, max_rps: u16, max_pending: u16) -> Result<Self, Error> {
        
        let mut client = Client::builder();
        if proxy_addr != "none" { client = client.proxy(reqwest::Proxy::all(proxy_addr)?) }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        let client = client.build()?;
        let (in_tx, in_rx) = mpsc::channel::<Task>(max_pending as usize + 1 as usize);
        let res_tx_clone = res_tx.clone();
        tokio::spawn(async move { start_loop_in(client, in_rx, res_tx_clone).await });

        tokio::spawn(async move { start_loop_res(res_rx, parent_tx).await });
        
        Ok(Proxy{
            id,
            max_rps,
            max_pending,
            in_tx,
        })
    }

    pub fn get_in_tx(&self) -> Sender<Task> {
        self.in_tx.clone()
    }
}