与 tokio 的 MQTT 连接

MQTT connection with tokio

我正在尝试与 tokio 和 mqtt_v5 crate 中提供的编解码器建立 MQTT 连接。我的代码无法编译,我不明白为什么。这是我目前写的,发送代码可能不正确。

use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use tokio_util::codec::Decoder;
use std::net::SocketAddrV4;
use mqtt_v5::types::Packet as MqttPacket;
use mqtt_v5::codec::MqttCodec;
use futures_sink::Sink;
use futures_core::stream::Stream;

struct MqttConn {
    inner: Framed<TcpStream, MqttCodec>,
}

impl MqttConn {
    async fn new(addr: SocketAddrV4) -> MqttConn {
        let tcp = TcpStream::connect(addr).await.expect("cannot connect to mqtt");
        MqttConn { inner: MqttCodec::new().framed(tcp) } 
    }

    async fn handle(&self, handler: &dyn Fn(&MqttConn, MqttPacket) -> ()) {
        while let Some(p) = self.inner.next().await {
            handler(self, p)
        }
    }

    async fn send(&self, p: MqttPacket) {
        self.inner.start_send(p);
    }
}

我从编译器中得到这些错误:

error[E0599]: no method named `framed` found for struct `MqttCodec` in the current scope
  --> src/mqtt.rs:17:44
   |
17 |         MqttConn { inner: MqttCodec::new().framed(tcp) } 
   |                                            ^^^^^^ method not found in `MqttCodec`
   |
   = help: items from traits can only be used if the trait is in scope
   = note: the following trait is implemented but not in scope; perhaps add a `use` for it:
           `use tokio_util::codec::decoder::Decoder;`

error[E0599]: no method named `next` found for struct `Framed<tokio::net::TcpStream, MqttCodec>` in the current scope
  --> src/mqtt.rs:21:40
   |
21 |         while let Some(p) = self.inner.next().await {
   |                                        ^^^^ method not found in `Framed<tokio::net::TcpStream, MqttCodec>`

error[E0599]: no method named `start_send` found for struct `Framed<tokio::net::TcpStream, MqttCodec>` in the current scope
  --> src/mqtt.rs:27:20
   |
27 |         self.inner.start_send(p);
   |                    ^^^^^^^^^^ method not found in `Framed<tokio::net::TcpStream, MqttCodec>`

warning: unused import: `tokio_util::codec::Decoder`
 --> src/mqtt.rs:3:5
  |
3 | use tokio_util::codec::Decoder;
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: unused import: `futures_sink::Sink`
 --> src/mqtt.rs:7:5
  |
7 | use futures_sink::Sink;
  |     ^^^^^^^^^^^^^^^^^^

编译器说 Decoder 特性不在范围内,但我使用它。如果我尝试建议的导入,我发现模块 tokio_util::codec::decoder 是私有的。基于来源 tokio_util::codec::decoder::Decoder 被重新导出为 tokio_util::codec::Decoder。另外,如果我正确阅读了文档,Framed 应该实现 SinkStream 因此它应该有 nextstart_send 方法。

来自Cargo.toml的相关行:

[dependencies]
tokio = { version = "1.0.1", features = ["full"] }
tokio-util = { version = "0.6", features = ["full"] }
futures-sink = "0.3.9"
futures-core = "0.3.9"

mqtt-v5 = "0.1.1"

我怎样才能编译它?

您有许多库不兼容导致一些不太明显的错误消息。

mqtt-v5 depends on tokio-util^0.3,这是为 tokio 0.2 而不是 1.0 编写的。您需要回滚到 tokio 0.2 和 tokio-util 0.3。这应该可以解决 DecoderSink.

的问题

另外,Stream trait只提供了poll_next(),即任务级流方法。 next()StreamExt 特性提供,以及与 Iterator.

中类似的其他便捷方法。