Tokio 频道发送,但不接收

Tokio channel sends, but doesn't receive

TL;DR 我正在尝试拥有一个通过 ID 和网络调用控制 ID 的后台线程,并且后台线程似乎并没有通过所有类型的渠道获取消息我试过了。

我已经尝试了 std 频道和 tokio 频道,并且除了来自 tokio 的 watcher 类型之外,我已经尝试了所有频道。所有结果都相同,这可能意味着我在不知不觉中搞砸了某处,但我找不到问题所在:

use std::collections::{
    hash_map::Entry::{Occupied, Vacant},
    HashMap,
};
use std::sync::Arc;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use uuid::Uuid;
use warp::{http, Filter};

#[derive(Default)]
pub struct Switcher {
    pub handle: Option<JoinHandle<bool>>,
    pub pipeline_end_tx: Option<UnboundedSender<String>>,
}

impl Switcher {
    pub fn set_sender(&mut self, tx: UnboundedSender<String>) {
        self.pipeline_end_tx = Some(tx);
    }
    pub fn set_handle(&mut self, handle: JoinHandle<bool>) {
        self.handle = Some(handle);
    }
}

const ADDR: [u8; 4] = [0, 0, 0, 0];
const PORT: u16 = 3000;

type RunningPipelines = Arc<RwLock<HashMap<String, Arc<RwLock<Switcher>>>>>;

#[tokio::main]
async fn main() {
    let running_pipelines = Arc::new(RwLock::new(HashMap::<String, Arc<RwLock<Switcher>>>::new()));

    let session_create = warp::post()
        .and(with_pipelines(running_pipelines.clone()))
        .and(warp::path("session"))
        .then(|pipelines: RunningPipelines| async move {
            println!("session requested OK!");
            let id = Uuid::new_v4();
            let mut switcher = Switcher::default();
            let (tx, mut rx) = mpsc::unbounded_channel::<String>();
            switcher.set_sender(tx);

            let t = tokio::spawn(async move {
                println!("Background going...");
                //This would be something processing in the background until it received the end signal
                match rx.recv().await {
                    Some(v) => {
                        println!(
                            "Got end message:{} YESSSSSS@!@@!!!!!!!!!!!!!!!!1111eleven",
                            v
                        );
                    }
                    None => println!("Error receiving end signal:"),
                }

                println!("ABORTING HANDLE");

                true
            });

            let ret = HashMap::from([("session_id", id.to_string())]);

            switcher.set_handle(t);

            {
                pipelines
                    .write()
                    .await
                    .insert(id.to_string(), Arc::new(RwLock::new(switcher)));
            }

            Ok(warp::reply::json(&ret))
        });

    let session_end = warp::delete()
        .and(with_pipelines(running_pipelines.clone()))
        .and(warp::path("session"))
        .and(warp::query::<HashMap<String, String>>())
        .then(
            |pipelines: RunningPipelines, p: HashMap<String, String>| async move {
                println!("session end requested OK!: {:?}", p);

                match p.get("session_id") {
                    None => Ok(warp::reply::with_status(
                        "Please specify session to end",
                        http::StatusCode::BAD_REQUEST,
                    )),
                    Some(id) => {
                        let mut pipe = pipelines.write().await;

                        match pipe.entry(String::from(id)) {
                            Occupied(handle) => {
                                println!("occupied");
                                let (k, v) = handle.remove_entry();
                                drop(pipe);
                                println!("removed from hashmap, key:{}", k);
                                let s = v.write().await;
                                if let Some(h) = &s.handle {
                                    if let Some(tx) = &s.pipeline_end_tx {
                                        match tx.send("goodbye".to_string()) {
                                            Ok(res) => {
                                                println!(
                                                "sent end message|{:?}| to fpipeline: {}",
                                                res, id
                                            );
                                            //Added this to try to get it to at least Error on the other side
                                            drop(tx);
                                        },
                                            Err(err) => println!(
                                                "ERROR sending end message to pipeline({}):{}",
                                                id, err
                                            ),
                                        };
                                    } else {
                                        println!("no sender channel found for pipeline: {}", id);
                                    };
                                    h.abort();
                                } else {
                                    println!(
                                        "no luck finding the value in handle in the switcher: {}",
                                        id
                                    );
                                };
                            }
                            Vacant(_) => {
                                println!("no luck finding the handle in the pipelines: {}", id)
                            }
                        };
                        Ok(warp::reply::with_status("done", http::StatusCode::OK))
                    }
                }
            },
        );

    let routes = session_create
        .or(session_end)
        .recover(handle_rejection)
        .with(warp::cors().allow_any_origin());

    println!("starting server...");
    warp::serve(routes).run((ADDR, PORT)).await;
}

async fn handle_rejection(
    err: warp::Rejection,
) -> Result<impl warp::Reply, std::convert::Infallible> {
    Ok(warp::reply::json(&format!("{:?}", err)))
}

fn with_pipelines(
    pipelines: RunningPipelines,
) -> impl Filter<Extract = (RunningPipelines,), Error = std::convert::Infallible> + Clone {
    warp::any().map(move || pipelines.clone())
}

取决于:

[dependencies]
warp = "0.3"
tokio = { version = "1", features = ["full"] }
uuid = { version = "0.8.2", features = ["serde", "v4"] }

我启动时的结果,发送一个“创建”请求,然后发送一个带有接收到的 ID 的“结束”请求:

starting server...
session requested OK!
Background going...
session end requested OK!: {"session_id": "6b984a45-38d8-41dc-bf95-422f75c5a429"}
occupied
removed from hashmap, key:6b984a45-38d8-41dc-bf95-422f75c5a429
sent end message|()| to fpipeline: 6b984a45-38d8-41dc-bf95-422f75c5a429

您会注意到,后台线程在发出“创建”请求时启动(并没有结束),但在发出“结束”请求时启动,而请求中的一切似乎都已成功完成( web) 端,后台线程永远不会收到消息。正如我所说,我已经尝试了所有不同的通道类型并四处移动以使其进入这种配置......即尽可能地或至少可以想到的扁平化和线程安全。我比生锈时更环保,所以任何帮助都将不胜感激!

我认为这里的问题是您正在发送消息,然后立即中止后台任务:

tx.send("goodbye".to_string());
//...
h.abort();

并且后台任务没有时间处理消息,因为中止具有更高的优先级。

您需要的是加入任务,而不是中止它。

奇怪的是,tokio tasks handles 没有 join() 方法,而是等待句柄本身。但是为此你需要拥有句柄,所以首先你必须从 Switcher:

中提取句柄
let mut s = v.write().await;
//steal the task handle
if let Some(h) = s.handle.take() {
   //...
   tx.send("goodbye".to_string());
   //...
   //join the task
   h.await.unwrap();
}

请注意,加入任务可能会失败,以防任务中止或恐慌。我只是对上面的代码感到恐慌,但你可能想做一些不同的事情。

或者……你等不及任务了。在 tokio 中,如果您删除任务句柄,它将被分离。然后,它会在完成时完成。