循环中的 Rust 所有权
Rust ownership in loops
我正在尝试在 Rust 中实现 rabbitmq send/listen 功能,我有以下代码:
struct RabbitMQ {
connection: Connection,
}
impl RabbitMQ {
fn connect() -> Self {
RabbitMQ {
connection: the created connection
}
}
}
impl MessageBroker for RabbitMQ {
async fn publish(&self, topic: &Topic) -> Result<PublisherConfirm, Error> {
let channel = self.connection.create_channel().await.unwrap();
RabbitMQ::create_exchange(&channel, &topic.exchange).await;
let payload = topic.message.as_bytes();
let res = channel.basic_publish(
topic.exchange.name.as_str(),
topic.queue.routing_key.as_str(),
topic.exchange.publish_options,
payload.to_vec(),
BasicProperties::default(),
);
res.await
}
}
目前一切顺利!
现在我想在一个 for 循环中发布许多消息而不等待服务器的确认,问题是当我生成 tokio 异步任务时我需要移动我的代理值,这使得它对下一个无效循环迭代:
let broker = RabbitMQ::connect(&connection_details).await;
for x in 1..10 {
tokio::spawn(async move {
let confirm = broker.publish(&my_topic).await.unwrap();
}).await.unwrap();
}
以上代码无法编译并出现以下错误:
error[E0382]: use of moved value: `broker`
--> src/main.rs:47:33
|
21 | let broker = RabbitMQ::connect(&connection_details).await;
| ------ move occurs because `broker` has type `message_broker::RabbitMQ`, which >does not implement the `Copy` trait
...
47 | tokio::spawn(async move {
| _________________________________^
48 | | let confirm = &broker.publish(&enable_cdn).await.unwrap();
| | ------ use occurs due to use in generator
49 | | }).await.unwrap();
| |_________^ value moved here, in previous iteration of loop
我无法实现 Copy 特性,因为 Connection 不是原始的,而且我似乎无法使用引用“ &”给经纪人。
我的问题是如何在不编写 n 发布调用的情况下完成此操作?
您正在使用 async move
块,这意味着块中使用的任何 name 都将移至未来,无论正在执行的操作如何。所以写
&broker.publish
块内部没有区别:首先移动 broker
,未来(当用 .await
轮询时)对其进行内部引用。所以你需要做的是在块外借用,然后将借用的块移动到块内:
let broker = RabbitMQ::connect(&connection_details).await;
for x in 1..10 {
let broker = &broker;
tokio::spawn(async move {
let confirm = broker.publish(&enable_cdn).await.unwrap();
}).await.unwrap();
}
但我 认为 这也行不通:tokio::spawn
没有作用域,所以即使你 await-ing 它,编译器不知道它不会比 broker
长寿。就它而言,tokio 任务可以想活多久就活多久。这意味着你现在可能会得到一个生命周期错误(编译器会假设借用 can 比封闭函数长寿,因此它的起源)。
一个简单的解决方案是将连接放在 Arc
或其他东西后面。
或者,重组您的系统以更好地满足 rabbitmq 的要求:不知道您使用的是哪个,但 amiquip 声明连接是 thread-safe 和 通道 而不是 thread-safe 可以发送到其他线程。
因此,与其 publish-ing 隐式连接,不如在循环的每次迭代中创建一个通道并将 that 移动到任务中,以便实际执行发布.
此外,
Now I want to publish many messages in a for loop without waiting for the confirmation from the server
既然你在等待 tokio::spawn 的结果,你还在这样做吗?
我正在尝试在 Rust 中实现 rabbitmq send/listen 功能,我有以下代码:
struct RabbitMQ {
connection: Connection,
}
impl RabbitMQ {
fn connect() -> Self {
RabbitMQ {
connection: the created connection
}
}
}
impl MessageBroker for RabbitMQ {
async fn publish(&self, topic: &Topic) -> Result<PublisherConfirm, Error> {
let channel = self.connection.create_channel().await.unwrap();
RabbitMQ::create_exchange(&channel, &topic.exchange).await;
let payload = topic.message.as_bytes();
let res = channel.basic_publish(
topic.exchange.name.as_str(),
topic.queue.routing_key.as_str(),
topic.exchange.publish_options,
payload.to_vec(),
BasicProperties::default(),
);
res.await
}
}
目前一切顺利!
现在我想在一个 for 循环中发布许多消息而不等待服务器的确认,问题是当我生成 tokio 异步任务时我需要移动我的代理值,这使得它对下一个无效循环迭代:
let broker = RabbitMQ::connect(&connection_details).await;
for x in 1..10 {
tokio::spawn(async move {
let confirm = broker.publish(&my_topic).await.unwrap();
}).await.unwrap();
}
以上代码无法编译并出现以下错误:
error[E0382]: use of moved value: `broker`
--> src/main.rs:47:33
|
21 | let broker = RabbitMQ::connect(&connection_details).await;
| ------ move occurs because `broker` has type `message_broker::RabbitMQ`, which >does not implement the `Copy` trait
...
47 | tokio::spawn(async move {
| _________________________________^
48 | | let confirm = &broker.publish(&enable_cdn).await.unwrap();
| | ------ use occurs due to use in generator
49 | | }).await.unwrap();
| |_________^ value moved here, in previous iteration of loop
我无法实现 Copy 特性,因为 Connection 不是原始的,而且我似乎无法使用引用“ &”给经纪人。
我的问题是如何在不编写 n 发布调用的情况下完成此操作?
您正在使用 async move
块,这意味着块中使用的任何 name 都将移至未来,无论正在执行的操作如何。所以写
&broker.publish
块内部没有区别:首先移动 broker
,未来(当用 .await
轮询时)对其进行内部引用。所以你需要做的是在块外借用,然后将借用的块移动到块内:
let broker = RabbitMQ::connect(&connection_details).await;
for x in 1..10 {
let broker = &broker;
tokio::spawn(async move {
let confirm = broker.publish(&enable_cdn).await.unwrap();
}).await.unwrap();
}
但我 认为 这也行不通:tokio::spawn
没有作用域,所以即使你 await-ing 它,编译器不知道它不会比 broker
长寿。就它而言,tokio 任务可以想活多久就活多久。这意味着你现在可能会得到一个生命周期错误(编译器会假设借用 can 比封闭函数长寿,因此它的起源)。
一个简单的解决方案是将连接放在 Arc
或其他东西后面。
或者,重组您的系统以更好地满足 rabbitmq 的要求:不知道您使用的是哪个,但 amiquip 声明连接是 thread-safe 和 通道 而不是 thread-safe 可以发送到其他线程。
因此,与其 publish-ing 隐式连接,不如在循环的每次迭代中创建一个通道并将 that 移动到任务中,以便实际执行发布.
此外,
Now I want to publish many messages in a for loop without waiting for the confirmation from the server
既然你在等待 tokio::spawn 的结果,你还在这样做吗?