响应式编程:如何订阅和采样事件发射器?

Reactive Programming: How to subscribe and sample an event emitter?

我正在订阅正在使用 react-native-ble-manager 的 React Native 应用程序中的事件发射器。

handleUpdateValueForCharacteristic(data) {

    console.log('Received data from ' + data.peripheral + ' characteristic ' + data.characteristic, data.value);

  }
bleManagerEmitter.addListener('BleManagerDidUpdateValueForCharacteristic', this.handleUpdateValueForCharacteristic );

我正在处理频率为每秒 50、100 或 200 个事件 (Hz) 的蓝牙事件流。

我对所有 50 Hz 的事件都很感兴趣,其中一半是 100 Hz,四分之一是 200 Hz。 使用 RxJS 订阅此事件流的正确方法是什么?我应该使用哪个运算符来对数据进行采样?

我可能错了,但我似乎找不到辅助方法来从事件发射器创建可观察对象。

fromEventPattern 应该是您要找的。

它可以让您创建一个基于自定义事件发射的可观察对象(就像您通过这个 BLE 管理器获得的一样)。

我在下面提供了一个片段,概述了您可以如何使用它。

注意 scan()filter() 的组合。通过使用前一个运算符来跟踪第 nth 个事件,它有效地改变了事件被采样的速率,从而被任何订阅者处理。

在您的场景中,您会希望 scan() 也跟踪发出的事件,因此您最终可以在 filter() 调用之后 map() 它,这样订阅者就会收到它。这里的关键点是在累积事件时跟踪 scan() 中的事件状态(即滴答和事件数据属性,代码段中分别为 tdata)。

const { fromEventPattern } = rxjs;
const { filter, map, scan } = rxjs.operators;

// Tweak parameters to vary demo
const hz = 200;
const sample = 4;

function addEmitterHandler(handler) {
  // bleManagerEmitter.addListener('event', handler)
  const intervalId = setInterval(() => {
    handler({ timestamp: Date.now() });
  }, 1000 / hz);
  return intervalId;
}

function removeEmitterHandler(handler, intervalId) {
  // bleManagerEmitter.removeListener(...)
  clearInterval(intervalId);
}

// Simulate emissions using the `setInterval()` call
const emitter = fromEventPattern(
  addEmitterHandler,
  removeEmitterHandler
);

emitter.pipe(
  // Use `scan()` and `filter()` combination to adjust sampling
  scan((state, data) => {
    const t = (state.t % (sample + 1)) + 1;
    return { t, data };
  }, { t: 0, data: null }),
  filter(state => state.t % sample === 0),
  // Use `map()` to forward event data only
  map(state => state.data),
).subscribe(data => console.log(data));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>