深入探讨 Rx.ReplaySubject:如何延迟 `next()`?

Deep diving into Rx.ReplaySubject: How to delay `next()`?

Ignoring Block: Either I'm wrong here, or the whiskey is starting to work. (I don't want to rule out that I'm going stupid either. Sorry for that.)

期望:

我本来希望 ReplaySubject 到 return 每 2 秒一个值,因为我在调用 next() 之前等待两秒(每次)。

结果:

结果是等待了2秒,但是之后所有值都同时输出

有问题的代码在这里:

import { ReplaySubject } from 'rxjs';

export const rs$: ReplaySubject<number> = new ReplaySubject();

rs$.subscribe({
  next: (data) => console.log(data),
  error: (error) => console.warn(error),
  complete: () => console.log('ReplaySubject completed'),
});

const fakeAPIValuesOne: Array<number> = [7, 11, 13];

fakeAPIValuesOne.forEach(async (entry: number) => {
  await wait(2000);  // <--- Why does wait() not work?
  rs$.next(entry);
});

function wait(milliseconds: number) {
  return new Promise((resolve) => setTimeout(resolve, milliseconds));
}

问题:

我在这里做错了什么?

如果你想尝试一下:https://stackblitz.com/edit/rxjs-wpnqvq?file=index.ts

编辑 1:

SetTimeout也没有作用。下面的代码和上面的完全一样:

fakeAPIValuesOne.forEach((value: number) => {
  setTimeout(() => {
    rs$.next(value);
  }, 2000);
});

我想知道 next() 怎么能覆盖这里的所有延迟?

编辑 2

问题已解决,正确答案标明,谢谢!您需要以下详细信息才能 运行 root 级别等待您的 ts 文件。

package.json

请注意类型部分:

{
  "name": "playground",
  "version": "1.0.0",
  "description": "",
  "main": "index.ts",
  "scripts": {
    "start": "nodemon index.ts",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "MIT",
  "dependencies": {
    "rxjs": "^7.5.5",
    "ts-node": "^10.7.0",
    "typescript": "^4.8.0-dev.20220507"
  },
  "type": "module"
}

nodemon.json

请考虑以下配置以避免错误:TypeError [ERR_UNKNOWN_FILE_EXTENSION]: Unknown file extension ".ts"

{
  "execMap": {
    "ts": "node --loader ts-node/esm"
  }
}

最后但并非最不重要的一点tsconfig.json

{
  "compilerOptions": {
    "module": "ESNext",
    "target": "ESNext",
    "moduleResolution": "node",
    "esModuleInterop": true,
    "allowSyntheticDefaultImports": true,
    "isolatedModules": true,
    "noEmit": true,
    "strict": true,
    "lib": ["esnext", "DOM"]
  }
}

以下注释摘自 mozilla 网络文档 here

Note: forEach expects a synchronous function.

forEach does not wait for promises. Make sure you are aware of the implications while using promises (or async functions) as forEach callback.

所以你这个问题与 ReplaySubject 无关,你不能使用 forEach 这个 use-case。

干杯

编辑:已解决

import { ReplaySubject } from "rxjs";

export const rs$ = new ReplaySubject();

rs$.subscribe({
  next: (data) => console.log(data),
  error: (error) => console.warn(error),
  complete: () => console.log("ReplaySubject completed"),
});

const fakeAPIValuesOne = [7, 11, 13];

// That won't work:
// fakeAPIValuesOne.forEach(async (entry: number) => {
//   await wait(2000);
//   rs$.next(entry);
// });


// That will work
for (const element of fakeAPIValuesOne) {
  await wait(2000);
  rs$.next(element);
}

function wait(milliseconds: number) {
  return new Promise((resolve) => setTimeout(resolve, milliseconds));
}

我认为值得一提的是,您(通常)不应该期望它会起作用。

考虑以下几点:

wait(2000);
wait(2000);
wait(2000);
console.log("Hello");

"Hello" 将立即打印到控制台。它不会等待 6 秒。

如果将它们放在一个循环中,这不会改变。

for (const n of [1,2,3]) {
  wait(2000);
}
console.log("Hello");

这也会立即打印“Hello”。


如果您不使用 .then()await,标准行为是您 不会 等待。某些 API(当然)会代表您执行等待,但预期的行为是它不会。

你需要这样写:

await wait(2000);
await wait(2000);
await wait(2000);
console.log("Hello");

现在您将等待 6 秒,然后将“Hello”打印到控制台。


现在看看这个:

const createPromise = async (v) => {
   await wait(2000);
   console.log("Hello From createPromise: ", v);
}

createPromise(1);
createPromise(2);
createPromise(3);
console.log("Hello");

您会发现这并没有什么不同。这将立即打印 Hello,然后 2 秒后所有的承诺将解决并打印

Hello
// 2 seconds wait
Hello From createPromise: 1
Hello From createPromise: 2
Hello From createPromise: 3

您会注意到 none 他们互相等待。这是承诺的预期行为。您需要使用 await.then 才能实际等待承诺的结果。

你需要写:

await createPromise(1);
await createPromise(2);
await createPromise(3);
console.log("Hello");

这两个循环做同样的事情(正如预期的那样,它们不等待)

for (const n of [1,2,3]) {
  createPromise(n);
}

[1,2,3].forEach(n => createPromise(n))

console.log("Hello");

同样,您首先会看到打印的内容是 "Hello",因为您永远不会等待这些承诺中的任何一个得到解决。

您确实等待内部 wait(2000) 承诺,但您永远不会等待外部 createPromise() 承诺。所以你当然不会等他们:)

for (const n of [1,2,3]) {
  await createPromise(n);
}