与 tokio 的 MQTT 连接
MQTT connection with tokio
我正在尝试与 tokio 和 mqtt_v5 crate 中提供的编解码器建立 MQTT 连接。我的代码无法编译,我不明白为什么。这是我目前写的,发送代码可能不正确。
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use tokio_util::codec::Decoder;
use std::net::SocketAddrV4;
use mqtt_v5::types::Packet as MqttPacket;
use mqtt_v5::codec::MqttCodec;
use futures_sink::Sink;
use futures_core::stream::Stream;
struct MqttConn {
inner: Framed<TcpStream, MqttCodec>,
}
impl MqttConn {
async fn new(addr: SocketAddrV4) -> MqttConn {
let tcp = TcpStream::connect(addr).await.expect("cannot connect to mqtt");
MqttConn { inner: MqttCodec::new().framed(tcp) }
}
async fn handle(&self, handler: &dyn Fn(&MqttConn, MqttPacket) -> ()) {
while let Some(p) = self.inner.next().await {
handler(self, p)
}
}
async fn send(&self, p: MqttPacket) {
self.inner.start_send(p);
}
}
我从编译器中得到这些错误:
error[E0599]: no method named `framed` found for struct `MqttCodec` in the current scope
--> src/mqtt.rs:17:44
|
17 | MqttConn { inner: MqttCodec::new().framed(tcp) }
| ^^^^^^ method not found in `MqttCodec`
|
= help: items from traits can only be used if the trait is in scope
= note: the following trait is implemented but not in scope; perhaps add a `use` for it:
`use tokio_util::codec::decoder::Decoder;`
error[E0599]: no method named `next` found for struct `Framed<tokio::net::TcpStream, MqttCodec>` in the current scope
--> src/mqtt.rs:21:40
|
21 | while let Some(p) = self.inner.next().await {
| ^^^^ method not found in `Framed<tokio::net::TcpStream, MqttCodec>`
error[E0599]: no method named `start_send` found for struct `Framed<tokio::net::TcpStream, MqttCodec>` in the current scope
--> src/mqtt.rs:27:20
|
27 | self.inner.start_send(p);
| ^^^^^^^^^^ method not found in `Framed<tokio::net::TcpStream, MqttCodec>`
warning: unused import: `tokio_util::codec::Decoder`
--> src/mqtt.rs:3:5
|
3 | use tokio_util::codec::Decoder;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: unused import: `futures_sink::Sink`
--> src/mqtt.rs:7:5
|
7 | use futures_sink::Sink;
| ^^^^^^^^^^^^^^^^^^
编译器说 Decoder
特性不在范围内,但我使用它。如果我尝试建议的导入,我发现模块 tokio_util::codec::decoder
是私有的。基于来源 tokio_util::codec::decoder::Decoder
被重新导出为 tokio_util::codec::Decoder
。另外,如果我正确阅读了文档,Framed
应该实现 Sink
和 Stream
因此它应该有 next
和 start_send
方法。
来自Cargo.toml的相关行:
[dependencies]
tokio = { version = "1.0.1", features = ["full"] }
tokio-util = { version = "0.6", features = ["full"] }
futures-sink = "0.3.9"
futures-core = "0.3.9"
mqtt-v5 = "0.1.1"
我怎样才能编译它?
您有许多库不兼容导致一些不太明显的错误消息。
mqtt-v5
depends on tokio-util^0.3
,这是为 tokio
0.2 而不是 1.0 编写的。您需要回滚到 tokio
0.2 和 tokio-util
0.3。这应该可以解决 Decoder
和 Sink
.
的问题
另外,Stream
trait只提供了poll_next()
,即任务级流方法。 next()
由 StreamExt
特性提供,以及与 Iterator
.
中类似的其他便捷方法。
我正在尝试与 tokio 和 mqtt_v5 crate 中提供的编解码器建立 MQTT 连接。我的代码无法编译,我不明白为什么。这是我目前写的,发送代码可能不正确。
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use tokio_util::codec::Decoder;
use std::net::SocketAddrV4;
use mqtt_v5::types::Packet as MqttPacket;
use mqtt_v5::codec::MqttCodec;
use futures_sink::Sink;
use futures_core::stream::Stream;
struct MqttConn {
inner: Framed<TcpStream, MqttCodec>,
}
impl MqttConn {
async fn new(addr: SocketAddrV4) -> MqttConn {
let tcp = TcpStream::connect(addr).await.expect("cannot connect to mqtt");
MqttConn { inner: MqttCodec::new().framed(tcp) }
}
async fn handle(&self, handler: &dyn Fn(&MqttConn, MqttPacket) -> ()) {
while let Some(p) = self.inner.next().await {
handler(self, p)
}
}
async fn send(&self, p: MqttPacket) {
self.inner.start_send(p);
}
}
我从编译器中得到这些错误:
error[E0599]: no method named `framed` found for struct `MqttCodec` in the current scope
--> src/mqtt.rs:17:44
|
17 | MqttConn { inner: MqttCodec::new().framed(tcp) }
| ^^^^^^ method not found in `MqttCodec`
|
= help: items from traits can only be used if the trait is in scope
= note: the following trait is implemented but not in scope; perhaps add a `use` for it:
`use tokio_util::codec::decoder::Decoder;`
error[E0599]: no method named `next` found for struct `Framed<tokio::net::TcpStream, MqttCodec>` in the current scope
--> src/mqtt.rs:21:40
|
21 | while let Some(p) = self.inner.next().await {
| ^^^^ method not found in `Framed<tokio::net::TcpStream, MqttCodec>`
error[E0599]: no method named `start_send` found for struct `Framed<tokio::net::TcpStream, MqttCodec>` in the current scope
--> src/mqtt.rs:27:20
|
27 | self.inner.start_send(p);
| ^^^^^^^^^^ method not found in `Framed<tokio::net::TcpStream, MqttCodec>`
warning: unused import: `tokio_util::codec::Decoder`
--> src/mqtt.rs:3:5
|
3 | use tokio_util::codec::Decoder;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: unused import: `futures_sink::Sink`
--> src/mqtt.rs:7:5
|
7 | use futures_sink::Sink;
| ^^^^^^^^^^^^^^^^^^
编译器说 Decoder
特性不在范围内,但我使用它。如果我尝试建议的导入,我发现模块 tokio_util::codec::decoder
是私有的。基于来源 tokio_util::codec::decoder::Decoder
被重新导出为 tokio_util::codec::Decoder
。另外,如果我正确阅读了文档,Framed
应该实现 Sink
和 Stream
因此它应该有 next
和 start_send
方法。
来自Cargo.toml的相关行:
[dependencies]
tokio = { version = "1.0.1", features = ["full"] }
tokio-util = { version = "0.6", features = ["full"] }
futures-sink = "0.3.9"
futures-core = "0.3.9"
mqtt-v5 = "0.1.1"
我怎样才能编译它?
您有许多库不兼容导致一些不太明显的错误消息。
mqtt-v5
depends on tokio-util^0.3
,这是为 tokio
0.2 而不是 1.0 编写的。您需要回滚到 tokio
0.2 和 tokio-util
0.3。这应该可以解决 Decoder
和 Sink
.
另外,Stream
trait只提供了poll_next()
,即任务级流方法。 next()
由 StreamExt
特性提供,以及与 Iterator
.