Actix SyncArbiter 注册表
Actix SyncArbiter registry
我正在尝试使用 SyncArbiter
实现一个包含 10 个 Redis 连接的池,供不同的参与者使用。假设我们有一个名为 Bob 的 actor,它必须使用 Redis actor 来完成它的任务。
虽然这可以通过以下方式实现:
// crate, use and mod statements have been omitted to lessen clutter
/// FILE main.rs
pub struct AppState {
pub redis: Addr<Redis>,
pub bob: Addr<Bob>
}
fn main() {
let system = actix::System::new("theatre");
server::new(move || {
let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
let bob_addr = SyncArbiter::start(10, || Bob::new());
let state = AppState {
redis: redis_addr,
bob: bob_addr
};
App::with_state(state).resource("/bob/eat", |r| {
r.method(http::Method::POST)
.with_async(controllers::bob::eat)
})
})
.bind("0.0.0.0:8080")
.unwrap()
.start();
println!("Server started.");
system.run();
}
/// FILE controllers/bob.rs
pub struct Food {
name: String,
kcal: u64
}
pub fn eat(
(req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
state
.bob
.send(Eat::new(req.into_inner()))
.from_err()
.and_then(|res| match res {
Ok(val) => {
println!("==== BODY ==== {:?}", val);
Ok(HttpResponse::Ok().into())
}
Err(_) => Ok(HttpResponse::InternalServerError().into()),
})
}
/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
pub client: Client
}
pub struct RunCommand(Cmd);
impl RunCommand {
pub fn new(cmd: Cmd) -> Self {
RunCommand(cmd)
}
}
impl Message for RunCommand {
type Result = Result<RedisResult<String>, ()>;
}
impl Actor for Redis {
type Context = SyncContext<Self>;
}
impl Handler<RunCommand> for Redis {
type Result = Result<RedisResult<String>, ()>;
fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
println!("Redis received command!");
Ok(Ok("OK".to_string()))
}
}
impl Redis {
pub fn new(url: &str) -> Result<Self, RedisError> {
let client = match Client::open(url) {
Ok(client) => client,
Err(error) => return Err(error)
};
let redis = Redis {
client: client,
};
Ok(redis)
}
}
/// FILE actors/bob.rs
pub struct Bob;
pub struct Eat(Food);
impl Message for Eat {
type Result = Result<Bob, ()>;
}
impl Actor for Eat {
type Context = SyncContext<Self>;
}
impl Handler<Eat> for Bob {
type Result = Result<(), ()>;
fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
println!("Bob received {:?}", &msg);
// How to get a Redis actor and pass data to it here?
Ok(msg.datapoint)
}
}
impl Bob {
pub fn new() -> () {
Bob {}
}
}
从 Bob 的上述 handle 实现来看,不清楚 Bob 如何获得 Redis actor 的地址。或将任何消息发送到 SyncArbiter
中的任何 Actor
运行。
同样可以使用常规 Arbiter
和 Registry
来实现,但据我所知,Actix 不允许多个相同的演员(例如,我们无法开始10 个 Redis actors 使用常规 Arbiter
).
形式化我的问题:
- 有
SyncArbiter
演员Registry
吗
- 我可以定期启动多个相同类型的演员吗
Arbiter
- 是否有更好/更规范的方式来实现连接池
编辑
版本:
- actix 0.7.9
- actix_web 0.7.19
- 期货=“0.1.26”
- 生锈 1.33.0
我自己找到了答案。
开箱即用,无法从注册表中检索带有 SyncContext
的 Actor
。
鉴于我上面的例子。对于演员 Bob
向 Redis
演员发送任何类型的消息,它需要知道 Redis
演员的地址。 Bob
可以显式获取 Redis
的地址 - 包含在发送给它的消息中或从某种共享状态读取。
我想要一个类似于 Erlang 的系统,所以我决定不通过消息传递参与者的地址,因为它看起来太费力、容易出错,而且在我看来,它违背了拥有基于参与者的并发模型的目的(因为没有人演员可以向任何其他演员发送消息)。
因此,我研究了共享状态的想法,并决定实现我自己的 SyncRegistry
,这将类似于 Actix 标准 Registry
- 这正是我想要的,但不是为了SyncContext
.
的演员
这是我编写的天真的解决方案:https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177
使用以下设置:
fn main() {
let system = actix::System::new("theatre");
let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
SyncRegistry::set(addr);
let addr = SyncArbiter::start(10, || Bob::new());
SyncRegistry::set(addr);
server::new(move || {
let state = AppState {};
App::with_state(state).resource("/foo", |r| {
r.method(http::Method::POST)
.with_async(controllers::foo::create)
})
})
.bind("0.0.0.0:8080")
.unwrap()
.start();
println!("Server started.");
system.run();
}
演员 Bob
可以通过以下方式从程序中的任何一点获取 Redis
的地址:
impl Handler<Eat> for Bob {
type Result = Result<(), ()>;
fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
let redis = match SyncRegistry::<Redis>::get() {
Some(redis) => redis,
_ => return Err(())
};
let cmd = redis::cmd("XADD")
.arg("things_to_eat")
.arg("*")
.arg("data")
.arg(&msg.0)
.to_owned();
redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
}
}
我正在尝试使用 SyncArbiter
实现一个包含 10 个 Redis 连接的池,供不同的参与者使用。假设我们有一个名为 Bob 的 actor,它必须使用 Redis actor 来完成它的任务。
虽然这可以通过以下方式实现:
// crate, use and mod statements have been omitted to lessen clutter
/// FILE main.rs
pub struct AppState {
pub redis: Addr<Redis>,
pub bob: Addr<Bob>
}
fn main() {
let system = actix::System::new("theatre");
server::new(move || {
let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
let bob_addr = SyncArbiter::start(10, || Bob::new());
let state = AppState {
redis: redis_addr,
bob: bob_addr
};
App::with_state(state).resource("/bob/eat", |r| {
r.method(http::Method::POST)
.with_async(controllers::bob::eat)
})
})
.bind("0.0.0.0:8080")
.unwrap()
.start();
println!("Server started.");
system.run();
}
/// FILE controllers/bob.rs
pub struct Food {
name: String,
kcal: u64
}
pub fn eat(
(req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
state
.bob
.send(Eat::new(req.into_inner()))
.from_err()
.and_then(|res| match res {
Ok(val) => {
println!("==== BODY ==== {:?}", val);
Ok(HttpResponse::Ok().into())
}
Err(_) => Ok(HttpResponse::InternalServerError().into()),
})
}
/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
pub client: Client
}
pub struct RunCommand(Cmd);
impl RunCommand {
pub fn new(cmd: Cmd) -> Self {
RunCommand(cmd)
}
}
impl Message for RunCommand {
type Result = Result<RedisResult<String>, ()>;
}
impl Actor for Redis {
type Context = SyncContext<Self>;
}
impl Handler<RunCommand> for Redis {
type Result = Result<RedisResult<String>, ()>;
fn handle(&mut self, msg: RunCommand, _context: &mut Self::Context) -> Self::Result {
println!("Redis received command!");
Ok(Ok("OK".to_string()))
}
}
impl Redis {
pub fn new(url: &str) -> Result<Self, RedisError> {
let client = match Client::open(url) {
Ok(client) => client,
Err(error) => return Err(error)
};
let redis = Redis {
client: client,
};
Ok(redis)
}
}
/// FILE actors/bob.rs
pub struct Bob;
pub struct Eat(Food);
impl Message for Eat {
type Result = Result<Bob, ()>;
}
impl Actor for Eat {
type Context = SyncContext<Self>;
}
impl Handler<Eat> for Bob {
type Result = Result<(), ()>;
fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
println!("Bob received {:?}", &msg);
// How to get a Redis actor and pass data to it here?
Ok(msg.datapoint)
}
}
impl Bob {
pub fn new() -> () {
Bob {}
}
}
从 Bob 的上述 handle 实现来看,不清楚 Bob 如何获得 Redis actor 的地址。或将任何消息发送到 SyncArbiter
中的任何 Actor
运行。
同样可以使用常规 Arbiter
和 Registry
来实现,但据我所知,Actix 不允许多个相同的演员(例如,我们无法开始10 个 Redis actors 使用常规 Arbiter
).
形式化我的问题:
- 有
SyncArbiter
演员Registry
吗 - 我可以定期启动多个相同类型的演员吗
Arbiter
- 是否有更好/更规范的方式来实现连接池
编辑
版本:
- actix 0.7.9
- actix_web 0.7.19
- 期货=“0.1.26”
- 生锈 1.33.0
我自己找到了答案。
开箱即用,无法从注册表中检索带有 SyncContext
的 Actor
。
鉴于我上面的例子。对于演员 Bob
向 Redis
演员发送任何类型的消息,它需要知道 Redis
演员的地址。 Bob
可以显式获取 Redis
的地址 - 包含在发送给它的消息中或从某种共享状态读取。
我想要一个类似于 Erlang 的系统,所以我决定不通过消息传递参与者的地址,因为它看起来太费力、容易出错,而且在我看来,它违背了拥有基于参与者的并发模型的目的(因为没有人演员可以向任何其他演员发送消息)。
因此,我研究了共享状态的想法,并决定实现我自己的 SyncRegistry
,这将类似于 Actix 标准 Registry
- 这正是我想要的,但不是为了SyncContext
.
这是我编写的天真的解决方案:https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177
使用以下设置:
fn main() {
let system = actix::System::new("theatre");
let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
SyncRegistry::set(addr);
let addr = SyncArbiter::start(10, || Bob::new());
SyncRegistry::set(addr);
server::new(move || {
let state = AppState {};
App::with_state(state).resource("/foo", |r| {
r.method(http::Method::POST)
.with_async(controllers::foo::create)
})
})
.bind("0.0.0.0:8080")
.unwrap()
.start();
println!("Server started.");
system.run();
}
演员 Bob
可以通过以下方式从程序中的任何一点获取 Redis
的地址:
impl Handler<Eat> for Bob {
type Result = Result<(), ()>;
fn handle(&mut self, msg: Eat, _context: &mut Self::Context) -> Self::Result {
let redis = match SyncRegistry::<Redis>::get() {
Some(redis) => redis,
_ => return Err(())
};
let cmd = redis::cmd("XADD")
.arg("things_to_eat")
.arg("*")
.arg("data")
.arg(&msg.0)
.to_owned();
redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
}
}