Actix SyncArbiter 注册表

Actix SyncArbiter registry

我正在尝试使用 SyncArbiter 实现一个包含 10 个 Redis 连接的池,供不同的参与者使用。假设我们有一个名为 Bob 的 actor,它必须使用 Redis actor 来完成它的任务。

虽然这可以通过以下方式实现:

// crate, use and mod statements have been omitted to lessen clutter

/// FILE main.rs
pub struct AppState {
    pub redis: Addr<Redis>,
    pub bob: Addr<Bob>
}

fn main() {
    let system = actix::System::new("theatre");

    server::new(move || {
        let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
        let bob_addr = SyncArbiter::start(10, || Bob::new());

        let state = AppState {
            redis: redis_addr,
            bob: bob_addr
        };

        App::with_state(state).resource("/bob/eat", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::bob::eat)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

/// FILE controllers/bob.rs
pub struct Food {
  name: String,
  kcal: u64
}

pub fn eat(
    (req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
    state
        .bob
        .send(Eat::new(req.into_inner()))
        .from_err()
        .and_then(|res| match res {
            Ok(val) => {
                println!("==== BODY ==== {:?}", val);
                Ok(HttpResponse::Ok().into())
            }
            Err(_) => Ok(HttpResponse::InternalServerError().into()),
        })
}

/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
    pub client: Client
}

pub struct RunCommand(Cmd);

impl RunCommand {
    pub fn new(cmd: Cmd) -> Self {
        RunCommand(cmd)
    }
}

impl Message for RunCommand {
    type Result = Result<RedisResult<String>, ()>;
}

impl Actor for Redis {
    type Context = SyncContext<Self>;
}

impl Handler<RunCommand> for Redis {
    type Result = Result<RedisResult<String>, ()>;

    fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
        println!("Redis received command!");
        Ok(Ok("OK".to_string()))
    }
}

impl Redis {
    pub fn new(url: &str) -> Result<Self, RedisError> {
        let client = match Client::open(url) {
            Ok(client) => client,
            Err(error) => return Err(error)
        };

        let redis = Redis {
            client: client,
        };

        Ok(redis)
    }
}

/// FILE actors/bob.rs
pub struct Bob;

pub struct Eat(Food);

impl Message for Eat {
    type Result = Result<Bob, ()>;
}

impl Actor for Eat {
    type Context = SyncContext<Self>;
}

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        println!("Bob received {:?}", &msg);

        // How to get a Redis actor and pass data to it here?

        Ok(msg.datapoint)
    }
}

impl Bob {
    pub fn new() -> () {
        Bob {}
    }
}

从 Bob 的上述 handle 实现来看,不清楚 Bob 如何获得 Redis actor 的地址。或将任何消息发送到 SyncArbiter 中的任何 Actor 运行。

同样可以使用常规 ArbiterRegistry 来实现,但据我所知,Actix 不允许多个相同的演员(例如,我们无法开始10 个 Redis actors 使用常规 Arbiter).

形式化我的问题:


编辑

版本:

我自己找到了答案。

开箱即用,无法从注册表中检索带有 SyncContextActor

鉴于我上面的例子。对于演员 BobRedis 演员发送任何类型的消息,它需要知道 Redis 演员的地址。 Bob 可以显式获取 Redis 的地址 - 包含在发送给它的消息中或从某种共享状态读取。

我想要一个类似于 Erlang 的系统,所以我决定不通过消息传递参与者的地址,因为它看起来太费力、容易出错,而且在我看来,它违背了拥有基于参与者的并发模型的目的(因为没有人演员可以向任何其他演员发送消息)。

因此,我研究了共享状态的想法,并决定实现我自己的 SyncRegistry,这将类似于 Actix 标准 Registry - 这正是我想要的,但不是为了SyncContext.

的演员

这是我编写的天真的解决方案:https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177

使用以下设置:

fn main() {
    let system = actix::System::new("theatre");

    let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
    SyncRegistry::set(addr);
    let addr = SyncArbiter::start(10, || Bob::new());
    SyncRegistry::set(addr);


    server::new(move || {
        let state = AppState {};

        App::with_state(state).resource("/foo", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::foo::create)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

演员 Bob 可以通过以下方式从程序中的任何一点获取 Redis 的地址:

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
        let redis = match SyncRegistry::<Redis>::get() {
            Some(redis) => redis,
            _ => return Err(())
        };

        let cmd = redis::cmd("XADD")
            .arg("things_to_eat")
            .arg("*")
            .arg("data")
            .arg(&msg.0)
            .to_owned();

        redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
    }
}