rxjs 运算符从源收集字符串并根据模式部分发射它们

rxjs operators to collect strings from a source and partially emit them according to a pattern

所以我在支持蓝牙的项目上使用 typescript/RXJS/React-Native;我有一个函数可以从给定的外围设备接收字符串,但是有一些我无法真正避免从这个外围设备中避免的警告。

首先,它通过特定的命令模式与我交流;也就是说,每个命令都是 /[a-zA-Z][^;]*;/ 的形式(也就是一个字母字符,后跟任意数量的字符,以分号结尾)。

这些命令后面可能有也可能没有应该被忽略的空格。此外,这些命令可能会也可能不会被串联发送:a1234;bFGe4; 将是在同一消息中发送的两个命令 a1234;bFGe4;。但是,有一个警告:如果命令足够长,则它可能不完整。例如,我可能会收到两条消息 c444a;X1321241224124311234124;,这应该被翻译成两个单独的命令 c444a;X1321241224124311234124;。这是由于硬件限制。

我设法使用一个将最后接收到的字符串的结尾保留在自身中的可观察对象来处理这种情况:

const ANY_MSG = /[a-zA-Z][^;]*;/g

const messages$ = new Observable<string>(sub => {
  let previousMsg = "";

  // monitor messages is the function w/ a callback that receives messages from the device
  monitorMessages((err, msg) => {
    if (err) {
      return sub.error(err);
    }

    const currMsg = previousMsg + msg
    const matches = currMsg.match(ANY_MSG)

    let lastIndex = 0
    if(matches) {
      for(const match of matches) {
        sub.next(match)
        lastIndex += match.length
      }
    }
    previousMsg = currMsg.slice(lastIndex)
  });
});

这个 observable 按照我的预期发出我的消息:如果它们被 monitorMessages 函数部分接收,它们用分号分隔并连接起来。

问题是,我觉得这个函数很难阅读和理解,如果它是通过来自 RXJS 的 piped 函数组成的会更好。换句话说,我想按照这些思路做一些事情:

const messages$ = bindNodeCallback(monitorMessages).pipe(
  // ??????
);

但我无法弄清楚我需要在那里应用哪些运算符(或者即使我需要自己编写),也会出现以下情况:

这甚至可以完全通过 RXJS 运算符实现吗?也许我需要创建一个新的 observable 作为第二部分的 "storage" ?我只是觉得当前的方法(为 Observable 创建某种内部状态)非常奇怪并且相对难以理解 atm。

我认为你的实现很好。然而,如果你真的想用操作符来实现它,你可以使用 scan 在组合的可观察链中维护一些状态,像这样:

const messages$ = bindNodeCallback(monitorMessages).pipe(
  scan((acc, received) => {
    const data = acc.remainder + received;
    const messages = data.match(ANY_MSG);
    if (messages) {
      const length = messsages.reduce((total, message) => total + message.length, 0);
      return { messages, remainder: data.slice(length) };
    }
    return { messages: [], remainder: data };
  }, { messages: [], remainder: "" }),
  mergeMap(({ messages }) => messages)
);

要发出您的消息数组,您可以使用 mergeMap,返回数组 - 因为数组是 ObservableInput.