响应式编程:如何订阅和采样事件发射器?
Reactive Programming: How to subscribe and sample an event emitter?
我正在订阅正在使用 react-native-ble-manager 的 React Native 应用程序中的事件发射器。
handleUpdateValueForCharacteristic(data) {
console.log('Received data from ' + data.peripheral + ' characteristic ' + data.characteristic, data.value);
}
bleManagerEmitter.addListener('BleManagerDidUpdateValueForCharacteristic', this.handleUpdateValueForCharacteristic );
我正在处理频率为每秒 50、100 或 200 个事件 (Hz) 的蓝牙事件流。
我对所有 50 Hz 的事件都很感兴趣,其中一半是 100 Hz,四分之一是 200 Hz。
使用 RxJS 订阅此事件流的正确方法是什么?我应该使用哪个运算符来对数据进行采样?
我可能错了,但我似乎找不到辅助方法来从事件发射器创建可观察对象。
fromEventPattern
应该是您要找的。
它可以让您创建一个基于自定义事件发射的可观察对象(就像您通过这个 BLE 管理器获得的一样)。
我在下面提供了一个片段,概述了您可以如何使用它。
注意 scan()
和 filter()
的组合。通过使用前一个运算符来跟踪第 nth 个事件,它有效地改变了事件被采样的速率,从而被任何订阅者处理。
在您的场景中,您会希望 scan()
也跟踪发出的事件,因此您最终可以在 filter()
调用之后 map()
它,这样订阅者就会收到它。这里的关键点是在累积事件时跟踪 scan()
中的事件状态(即滴答和事件数据属性,代码段中分别为 t
和 data
)。
const { fromEventPattern } = rxjs;
const { filter, map, scan } = rxjs.operators;
// Tweak parameters to vary demo
const hz = 200;
const sample = 4;
function addEmitterHandler(handler) {
// bleManagerEmitter.addListener('event', handler)
const intervalId = setInterval(() => {
handler({ timestamp: Date.now() });
}, 1000 / hz);
return intervalId;
}
function removeEmitterHandler(handler, intervalId) {
// bleManagerEmitter.removeListener(...)
clearInterval(intervalId);
}
// Simulate emissions using the `setInterval()` call
const emitter = fromEventPattern(
addEmitterHandler,
removeEmitterHandler
);
emitter.pipe(
// Use `scan()` and `filter()` combination to adjust sampling
scan((state, data) => {
const t = (state.t % (sample + 1)) + 1;
return { t, data };
}, { t: 0, data: null }),
filter(state => state.t % sample === 0),
// Use `map()` to forward event data only
map(state => state.data),
).subscribe(data => console.log(data));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
我正在订阅正在使用 react-native-ble-manager 的 React Native 应用程序中的事件发射器。
handleUpdateValueForCharacteristic(data) {
console.log('Received data from ' + data.peripheral + ' characteristic ' + data.characteristic, data.value);
}
bleManagerEmitter.addListener('BleManagerDidUpdateValueForCharacteristic', this.handleUpdateValueForCharacteristic );
我正在处理频率为每秒 50、100 或 200 个事件 (Hz) 的蓝牙事件流。
我对所有 50 Hz 的事件都很感兴趣,其中一半是 100 Hz,四分之一是 200 Hz。 使用 RxJS 订阅此事件流的正确方法是什么?我应该使用哪个运算符来对数据进行采样?
我可能错了,但我似乎找不到辅助方法来从事件发射器创建可观察对象。
fromEventPattern
应该是您要找的。
它可以让您创建一个基于自定义事件发射的可观察对象(就像您通过这个 BLE 管理器获得的一样)。
我在下面提供了一个片段,概述了您可以如何使用它。
注意 scan()
和 filter()
的组合。通过使用前一个运算符来跟踪第 nth 个事件,它有效地改变了事件被采样的速率,从而被任何订阅者处理。
在您的场景中,您会希望 scan()
也跟踪发出的事件,因此您最终可以在 filter()
调用之后 map()
它,这样订阅者就会收到它。这里的关键点是在累积事件时跟踪 scan()
中的事件状态(即滴答和事件数据属性,代码段中分别为 t
和 data
)。
const { fromEventPattern } = rxjs;
const { filter, map, scan } = rxjs.operators;
// Tweak parameters to vary demo
const hz = 200;
const sample = 4;
function addEmitterHandler(handler) {
// bleManagerEmitter.addListener('event', handler)
const intervalId = setInterval(() => {
handler({ timestamp: Date.now() });
}, 1000 / hz);
return intervalId;
}
function removeEmitterHandler(handler, intervalId) {
// bleManagerEmitter.removeListener(...)
clearInterval(intervalId);
}
// Simulate emissions using the `setInterval()` call
const emitter = fromEventPattern(
addEmitterHandler,
removeEmitterHandler
);
emitter.pipe(
// Use `scan()` and `filter()` combination to adjust sampling
scan((state, data) => {
const t = (state.t % (sample + 1)) + 1;
return { t, data };
}, { t: 0, data: null }),
filter(state => state.t % sample === 0),
// Use `map()` to forward event data only
map(state => state.data),
).subscribe(data => console.log(data));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>