rxjs bindCallback 订阅 mqtt-client message-event 只触发一次,不触发 DataQueryResponse subscriber.next()

rxjs bindCallback Subscription on mqtt-client message-event only triggers once and does not trigger DataQueryResponse subscriber.next()

我目前正在尝试为 Grafana 创建一个流式数据源插件。为此,我调整了 code from the official instructions 以便将来自 MQTT 主题的数据传递给 Grafana。这是官方的工作演示代码,定时输出 frame:

Grafana 指南中的代码

query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
  const streams = options.targets.map(target => {
    const query = defaults(target, defaultQuery);

    return new Observable<DataQueryResponse>(subscriber => {
      const frame = new CircularDataFrame({
        append: 'tail',
        capacity: 1000,
      });

      frame.refId = query.refId;
      frame.addField({ name: 'time', type: FieldType.time });
      frame.addField({ name: 'value', type: FieldType.number });

      const intervalId = setInterval(() => {
        frame.add({ time: Date.now(), value: Math.random() });

        subscriber.next({
          data: [frame],
          key: query.refId,
        });
      }, 100);

      return () => {
        clearInterval(intervalId);
      };
    });
  });

  return merge(...streams);
}

与我的 Mosquitto 代理的连接以及数据的提交和接收工作正常。有问题的是通过 returns a CircularDataFrame.

的订阅者将接收到的数据传递到 Grafana 仪表板

按照中的建议,我使用 rxjs bindCallback 函数来监视消息事件处理程序。这样做时,消息回调函数中的控制台输出会为每条传入消息正确打印。而 clientOnObs 实例订阅中的控制台输出仅在第一条消息上触发,之后不再触发。但是,应该是订阅中的控制台输出也会在每个传入消息上触发,这样我就可以展开 CircularDataFrame,然后通过 subscriber.next() 将其传递到 Grafana Dashboard。我已经尝试交换 bindCallback()this.mqttClient.on() 的顺序,但是 subscribe 部分根本没有输出任何消息。此外,我尝试将 subscriber.next() 调用直接放入 this.mqttClient.on('message') 回调中,但也没有成功。

目标是 subscriber.next() 触发每条传入消息以将新数据传递到仪表板。我需要如何修改 query 方法的实现才能实现此目的?

我改编的代码

query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
    const streams = request.targets.map(target => {
      const query = defaults(target, defaultQuery);

      return new Observable<DataQueryResponse>(subscriber => {
        const frame = new CircularDataFrame({
          append: 'tail',
          capacity: 1000,
        });

        frame.refId = query.refId;
        frame.addField({ name: 'time', type: FieldType.time });
        frame.addField({ name: 'value', type: FieldType.number });

        const clientOnObs = bindCallback(this.mqttClient.on).bind(this.mqttClient);

        this.mqttClient.on('message', (topic: string, message: any) => {
          // this is printed correctly with every message
          console.log(topic, JSON.parse(message.toString()));
        });

        return () => {
          clientOnObs('message').subscribe((payload: any[]) => {
            // const topic: string = payload[0];
            const message = JSON.parse(payload[1].toString());
            
            // This is only printed on the first incoming message
            console.log(message);

            frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
            
            // The frame is not passed on at all
            subscriber.next({
              data: [frame],
              key: query.refId,
            });
          });
        };
      });
    });

    return merge(...streams);
  }

bindCallback 是一个函数,用于将基于回调的函数转换为 Obaservable,它发出 一次 然后完成。因此,这可能是仅针对第一条消息触发订阅的原因。

如果我查看您的代码,我会尝试执行以下操作

query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
    const streams = request.targets.map(target => {
      const query = defaults(target, defaultQuery);

      return new Observable<DataQueryResponse>(subscriber => {
        const frame = new CircularDataFrame({
          append: 'tail',
          capacity: 1000,
        });

        frame.refId = query.refId;
        frame.addField({ name: 'time', type: FieldType.time });
        frame.addField({ name: 'value', type: FieldType.number });

        this.mqttClient.on('message', (topic: string, message: any) => {
          // call next on the subscriber here
          const message = JSON.parse(payload[1].toString());
          frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
          subscriber.next({
            data: [frame],
            key: query.refId,
          });
        });

        return () => {
          // place here any code that has to run to clean up when the Observable
          // completes. In the graphana official example, for instance, this is 
          // where the interval is cleared
        };
      });
    });

    return merge(...streams);
  }