如何反序列化 Actix 演员中的消息?
How to deserialize messages within Actix actors?
我的意图是通过 WebSockets 接收事件并在 main
的闭包中使用它们。
这在消息是纯文本 (String
) 时有效,但想法是将该文本反序列化为某些结构。
在这个例子中,我只添加了 Data
、Error
和 Event
,但在其他情况下,它可能会有所不同,所以我使用泛型来做到这一点,但我有点迷路了。编译器提出了一些我已经尝试过的建议,但我不知道如何 "force" 将消息转换为特定类型(本例中为 Data
,但 EventManager
可以用在其他部分,所以应该是通用的)。
我附上了这段代码,它试图展示我的想法,尽管它无法编译:
events.rs
:
use actix::*;
use actix_web::ws::{Client, Message, ProtocolError};
use futures::Future;
use serde::de;
use serde_json::from_str;
struct MyActor<T> {
manager: EventManager<T>,
}
impl<T: 'static> Actor for MyActor<T> {
type Context = Context<Self>;
}
impl<T: 'static> StreamHandler<Message, ProtocolError> for MyActor<T> {
fn handle(&mut self, msg: Message, _ctx: &mut Context<Self>) {
match msg {
Message::Text(text) => {
debug!("Received {}", text);
for idx in 0..self.manager.events.len() {
let data =
from_str(&text).expect(&format!("Error when deserializing {:?}", text));
(self.manager.events[idx].handler)(data)
}
}
_ => panic!(),
}
}
}
pub struct Event<T> {
handler: Box<Fn(T) + 'static>,
}
pub struct EventManager<T> {
events: Vec<Event<T>>,
}
impl<T: 'static> EventManager<T>
where
T: serde::Deserialize<'static>,
{
pub fn new() -> Self {
Self { events: vec![] }
}
pub fn capture<F>(&mut self, function: F)
where
F: for<'h> Fn(T) + 'static,
{
let event = Event {
handler: Box::new(function),
};
self.events.push(event);
}
pub fn run(self) {
let runner = System::new("example");
debug!("run");
Arbiter::spawn(
Client::new("example")
.connect()
.map(|(reader, _writer)| {
MyActor::create(|ctx| {
MyActor::add_stream(reader, ctx);
MyActor { manager: self }
});
})
.map_err(|err| {}),
);
runner.run();
}
}
main.rs
:
#[macro_use]
extern crate log;
extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
pub mod events;
use actix::*;
use serde::de;
use serde::de::{Deserialize, Deserializer};
use events::EventManager;
#[derive(Debug, Message, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Data {
Error(Error),
Event(Event),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Error {
message: String,
code: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Event {
name: String,
content: String,
}
fn main() {
env_logger::init();
let mut client = EventManager::<Data>new();
client.capture(|data| debug!("event: {:?}", data));
client.run();
}
有一些修复可以让它编译。
让它编译的技巧是使用 Higher-Rank Trait Bounds (HTRB) 特征边界而不是声明 'static
生命周期。
遵循编译器建议并绑定 T: serde::Deserialize<'_>
特性:
impl<T> StreamHandler<Message, ProtocolError> for MyActor<T>
where
for<'de> T: serde::Deserialize<'de> + 'static,
然后还更改与 EventManager
impl 关联的 Deserialize<'static>
特征绑定和 HTRB 特征绑定,使其与 StreamHandler
实现的要求兼容:
impl<T: 'static> EventManager<T>
where
for<'de> T: serde::Deserialize<'de>,
最后,如果您更正了使用正确语法创建客户端的行:
let mut client: EventManager<Data> = EventManager::new();
示例代码应该可以编译。
注意:对于 capture
,使用更高的特征边界来声明 Fn
要求是多余的,只需:
pub fn capture<F>(&mut self, function: F)
where
F: Fn(T) + 'static,
我的意图是通过 WebSockets 接收事件并在 main
的闭包中使用它们。
这在消息是纯文本 (String
) 时有效,但想法是将该文本反序列化为某些结构。
在这个例子中,我只添加了 Data
、Error
和 Event
,但在其他情况下,它可能会有所不同,所以我使用泛型来做到这一点,但我有点迷路了。编译器提出了一些我已经尝试过的建议,但我不知道如何 "force" 将消息转换为特定类型(本例中为 Data
,但 EventManager
可以用在其他部分,所以应该是通用的)。
我附上了这段代码,它试图展示我的想法,尽管它无法编译:
events.rs
:
use actix::*;
use actix_web::ws::{Client, Message, ProtocolError};
use futures::Future;
use serde::de;
use serde_json::from_str;
struct MyActor<T> {
manager: EventManager<T>,
}
impl<T: 'static> Actor for MyActor<T> {
type Context = Context<Self>;
}
impl<T: 'static> StreamHandler<Message, ProtocolError> for MyActor<T> {
fn handle(&mut self, msg: Message, _ctx: &mut Context<Self>) {
match msg {
Message::Text(text) => {
debug!("Received {}", text);
for idx in 0..self.manager.events.len() {
let data =
from_str(&text).expect(&format!("Error when deserializing {:?}", text));
(self.manager.events[idx].handler)(data)
}
}
_ => panic!(),
}
}
}
pub struct Event<T> {
handler: Box<Fn(T) + 'static>,
}
pub struct EventManager<T> {
events: Vec<Event<T>>,
}
impl<T: 'static> EventManager<T>
where
T: serde::Deserialize<'static>,
{
pub fn new() -> Self {
Self { events: vec![] }
}
pub fn capture<F>(&mut self, function: F)
where
F: for<'h> Fn(T) + 'static,
{
let event = Event {
handler: Box::new(function),
};
self.events.push(event);
}
pub fn run(self) {
let runner = System::new("example");
debug!("run");
Arbiter::spawn(
Client::new("example")
.connect()
.map(|(reader, _writer)| {
MyActor::create(|ctx| {
MyActor::add_stream(reader, ctx);
MyActor { manager: self }
});
})
.map_err(|err| {}),
);
runner.run();
}
}
main.rs
:
#[macro_use]
extern crate log;
extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
pub mod events;
use actix::*;
use serde::de;
use serde::de::{Deserialize, Deserializer};
use events::EventManager;
#[derive(Debug, Message, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Data {
Error(Error),
Event(Event),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Error {
message: String,
code: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Event {
name: String,
content: String,
}
fn main() {
env_logger::init();
let mut client = EventManager::<Data>new();
client.capture(|data| debug!("event: {:?}", data));
client.run();
}
有一些修复可以让它编译。
让它编译的技巧是使用 Higher-Rank Trait Bounds (HTRB) 特征边界而不是声明 'static
生命周期。
遵循编译器建议并绑定 T: serde::Deserialize<'_>
特性:
impl<T> StreamHandler<Message, ProtocolError> for MyActor<T>
where
for<'de> T: serde::Deserialize<'de> + 'static,
然后还更改与 EventManager
impl 关联的 Deserialize<'static>
特征绑定和 HTRB 特征绑定,使其与 StreamHandler
实现的要求兼容:
impl<T: 'static> EventManager<T>
where
for<'de> T: serde::Deserialize<'de>,
最后,如果您更正了使用正确语法创建客户端的行:
let mut client: EventManager<Data> = EventManager::new();
示例代码应该可以编译。
注意:对于 capture
,使用更高的特征边界来声明 Fn
要求是多余的,只需:
pub fn capture<F>(&mut self, function: F)
where
F: Fn(T) + 'static,