rxjs bindCallback 订阅 mqtt-client message-event 只触发一次,不触发 DataQueryResponse subscriber.next()
rxjs bindCallback Subscription on mqtt-client message-event only triggers once and does not trigger DataQueryResponse subscriber.next()
我目前正在尝试为 Grafana 创建一个流式数据源插件。为此,我调整了 code from the official instructions 以便将来自 MQTT 主题的数据传递给 Grafana。这是官方的工作演示代码,定时输出 frame
:
Grafana 指南中的代码
query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = options.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
const intervalId = setInterval(() => {
frame.add({ time: Date.now(), value: Math.random() });
subscriber.next({
data: [frame],
key: query.refId,
});
}, 100);
return () => {
clearInterval(intervalId);
};
});
});
return merge(...streams);
}
与我的 Mosquitto 代理的连接以及数据的提交和接收工作正常。有问题的是通过 returns a CircularDataFrame
.
的订阅者将接收到的数据传递到 Grafana 仪表板
按照中的建议,我使用 rxjs bindCallback
函数来监视消息事件处理程序。这样做时,消息回调函数中的控制台输出会为每条传入消息正确打印。而 clientOnObs
实例订阅中的控制台输出仅在第一条消息上触发,之后不再触发。但是,应该是订阅中的控制台输出也会在每个传入消息上触发,这样我就可以展开 CircularDataFrame
,然后通过 subscriber.next()
将其传递到 Grafana Dashboard。我已经尝试交换 bindCallback()
和 this.mqttClient.on()
的顺序,但是 subscribe
部分根本没有输出任何消息。此外,我尝试将 subscriber.next()
调用直接放入 this.mqttClient.on('message')
回调中,但也没有成功。
目标是 subscriber.next()
触发每条传入消息以将新数据传递到仪表板。我需要如何修改 query
方法的实现才能实现此目的?
我改编的代码
query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = request.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
const clientOnObs = bindCallback(this.mqttClient.on).bind(this.mqttClient);
this.mqttClient.on('message', (topic: string, message: any) => {
// this is printed correctly with every message
console.log(topic, JSON.parse(message.toString()));
});
return () => {
clientOnObs('message').subscribe((payload: any[]) => {
// const topic: string = payload[0];
const message = JSON.parse(payload[1].toString());
// This is only printed on the first incoming message
console.log(message);
frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
// The frame is not passed on at all
subscriber.next({
data: [frame],
key: query.refId,
});
});
};
});
});
return merge(...streams);
}
bindCallback
是一个函数,用于将基于回调的函数转换为 Obaservable,它发出 一次 然后完成。因此,这可能是仅针对第一条消息触发订阅的原因。
如果我查看您的代码,我会尝试执行以下操作
query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = request.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
this.mqttClient.on('message', (topic: string, message: any) => {
// call next on the subscriber here
const message = JSON.parse(payload[1].toString());
frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
subscriber.next({
data: [frame],
key: query.refId,
});
});
return () => {
// place here any code that has to run to clean up when the Observable
// completes. In the graphana official example, for instance, this is
// where the interval is cleared
};
});
});
return merge(...streams);
}
我目前正在尝试为 Grafana 创建一个流式数据源插件。为此,我调整了 code from the official instructions 以便将来自 MQTT 主题的数据传递给 Grafana。这是官方的工作演示代码,定时输出 frame
:
Grafana 指南中的代码
query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = options.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
const intervalId = setInterval(() => {
frame.add({ time: Date.now(), value: Math.random() });
subscriber.next({
data: [frame],
key: query.refId,
});
}, 100);
return () => {
clearInterval(intervalId);
};
});
});
return merge(...streams);
}
与我的 Mosquitto 代理的连接以及数据的提交和接收工作正常。有问题的是通过 returns a CircularDataFrame
.
按照bindCallback
函数来监视消息事件处理程序。这样做时,消息回调函数中的控制台输出会为每条传入消息正确打印。而 clientOnObs
实例订阅中的控制台输出仅在第一条消息上触发,之后不再触发。但是,应该是订阅中的控制台输出也会在每个传入消息上触发,这样我就可以展开 CircularDataFrame
,然后通过 subscriber.next()
将其传递到 Grafana Dashboard。我已经尝试交换 bindCallback()
和 this.mqttClient.on()
的顺序,但是 subscribe
部分根本没有输出任何消息。此外,我尝试将 subscriber.next()
调用直接放入 this.mqttClient.on('message')
回调中,但也没有成功。
目标是 subscriber.next()
触发每条传入消息以将新数据传递到仪表板。我需要如何修改 query
方法的实现才能实现此目的?
我改编的代码
query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = request.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
const clientOnObs = bindCallback(this.mqttClient.on).bind(this.mqttClient);
this.mqttClient.on('message', (topic: string, message: any) => {
// this is printed correctly with every message
console.log(topic, JSON.parse(message.toString()));
});
return () => {
clientOnObs('message').subscribe((payload: any[]) => {
// const topic: string = payload[0];
const message = JSON.parse(payload[1].toString());
// This is only printed on the first incoming message
console.log(message);
frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
// The frame is not passed on at all
subscriber.next({
data: [frame],
key: query.refId,
});
});
};
});
});
return merge(...streams);
}
bindCallback
是一个函数,用于将基于回调的函数转换为 Obaservable,它发出 一次 然后完成。因此,这可能是仅针对第一条消息触发订阅的原因。
如果我查看您的代码,我会尝试执行以下操作
query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = request.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
this.mqttClient.on('message', (topic: string, message: any) => {
// call next on the subscriber here
const message = JSON.parse(payload[1].toString());
frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
subscriber.next({
data: [frame],
key: query.refId,
});
});
return () => {
// place here any code that has to run to clean up when the Observable
// completes. In the graphana official example, for instance, this is
// where the interval is cleared
};
});
});
return merge(...streams);
}