RxJS - 控制多个并行执行

RxJS - Control multiple parallel executions

我有一个场景,我必须控制多个 PTZ 摄像机,每个摄像机拍摄多个角度的照片。因此,例如:
Camera A 将取角度 A1A2A3
Camera B 将取角度 B1B2B3B4

将相机移动到正确的角度、拍摄图像并上传图像,这些都是 return 承诺的异步功能。
moveCamera( angle )
captureImage()
uploadImage()

摄像机必须并行操作,但每个摄像机的角度必须按顺序进行。

不知何故我觉得这可以通过 RxJS 轻松解决,但我正在努力将它们拼凑在一起。我能做的最好的就是类似于下面的解决方案,它以某种方式使相机按顺序相互处理。请注意,我使用 redux-observable,下面的代码是我在 plain RxJS 中所能做到的最好的。请原谅我的 RxJS。

const angles = {
  'Camera A': [ 'A1', 'A2', 'A3' ],
  'Camera B': [ 'B1', 'B2', 'B3', 'B4' ],
}
const cameras = of( [ 'Camera A', 'Camera B' ] );
const cameraRun = cameras.pipe(
  mergeMap( camera => {
    // in redux-observable, I could return an array here
    return of( angles[ camera ] );
  } )
);
cameraRun.pipe(
  concatMap( angle => {
    return moveCamera( angle )
      .then( () => captureImage() )
      .then( () => uploadImage() )
      .then( () => console.log( 'Image success' ) );
  } )
)

知道redux-observable的人,我有3个epics:
RUN_CAMERA_SET_ROUTINE - 在 mergeMap
内运行所有相机 RUN_CAMERA_ROUTINE - 为 mergeMap
内的每个摄像机运行所有角度 CAPTURE_IMAGE - 在一个 concatMap

中运行上面的异步函数

我最初的想法是 CAPTURE_IMAGE 将是 "grouped" 由于 mergeMap 生成流,但我错了。似乎 CAPTURE_IMAGE 仍然是一个流,在所有摄像机的每个角度排队。

任何指点都会很有帮助。

我会试一试。我在 StackBlitz 中提出了一个解决方案来展示我的想法。在点击按钮之前点击控制台开始新的运行.

关于此解决方案的一些要点:

  • 我只是用start$点了点鼠标就开始了新的运行,对解决方案不重要
  • 我用各种超时模拟了三个摄像头承诺函数,只是为了展示事情是如何按顺序执行的,但两个摄像头是如何并行执行的。
  • 我还为每个相机功能传递了一个 camera 的变量,但这只是为了 console.log() 可以清楚地显示相机在做什么。
  • 我没有对 redux-observable 做任何事情,而是保留原版 rxjs
  • 我使用 concat() 将拍摄照片转换为可观察的序列,而不是像您那样使用承诺链将其保留下来 - 这不是必需的,只是一种不同的处理方式。
  • 我将相机作为单独的 Observable(cameraA$cameraB$)保留,但这也可以通过一组相机来完成。

请随意分叉并更改它以使其更接近您正在寻找的内容。

这是 StackBlitz 中的内容:

import { mergeMap, concatMap, tap } from 'rxjs/operators';
import { fromEvent, from, concat, merge, defer } from 'rxjs';

const moveCamera = (camera, angle) => new Promise(
  (resolve, reject) => { 
    setTimeout(() => {
      console.log(`moved: ${camera} angle: ${angle}`);
      resolve();
    }, 1000) }
);

const captureImage = (camera) => new Promise(
  (resolve, reject) => { 
    setTimeout(() => {
      console.log(`${camera} captured image.`);
      resolve();
    }, 100) }
);

const uploadImage = (camera) => new Promise(
  (resolve, reject) => { 
    setTimeout(() => {
      console.log(`${camera} uploaded image.`);
      resolve();
    }, 2000) }
);


const start$ = fromEvent(document.getElementById('start'), 'click');

const takeAPhoto$ = (camera, angle) => concat(
  defer(() => moveCamera(camera, angle)),
  defer(() => captureImage(camera)),
  defer(() => uploadImage(camera))
);

const cameraA$ = from(['A1', 'A2', 'A3']).pipe(
  concatMap(angle => takeAPhoto$('Camera A', angle))
);

const cameraB$ = from(['B1', 'B2', 'B3', 'B4']).pipe(
  concatMap(angle => takeAPhoto$('Camera B', angle))
);

start$.pipe(
  tap(() => console.log('\n\nstart new run')),
  mergeMap(() => merge(cameraA$, cameraB$)),
).subscribe();

希望对您有所帮助。

你的问题归结为并行执行一些 Observables 并按顺序执行其他 Observables 并从 Promises 创建 Observables。

  1. 要并行执行多个 Observable,请使用:
  • forkJoin,如果你只想在所有相机动作完成后发射最终的 Observable
  • merge,如果你希望最终的 Observable 在每次单个相机动作成功时发射
  1. 使用concat顺序执行多个Observables。

  2. 使用 defer 从 Promise 创建 Observable,但不要立即执行 Promise。

然后你必须

  • 构造一个要按顺序执行的 Observable 数组。
    (单摄像头每个角度的动作)
  • 构造一个要并行执行的 Observable 数组。
    (每个相机的相机动作)。

这可能是纯 RxJS 中的代码

import { concat, forkJoin, merge, defer } from 'rxjs';

const cameras = ['Camera A', 'Camera B'];
const cameraAngles = { 
  'Camera A': ['A1', 'A2', 'A3'], 
  'Camera B': ['B1', 'B2', 'B3', 'B4'] 
}

// Performs a camera action consisting of multiple parts. Returns a Promise.
// camera: e.g. 'Camera A', angle: e.g. 'A1'
const doCameraAction = (camera, angle) => moveCamera(angle)
  .then(() => captureImage())
  .then(() => uploadImage())
  .then(() => console.log('Image success'));

// Creates an Observables that executes multiple camera actions in sequence.
// camera: e.g. 'Camera A', angles: e.g. ['A1', 'A2', 'A3']
const getCameraActionSequence$ = (camera, angles) => concat(
  // the array of Observables we want to execute in sequence
  ...angles.map(angle => defer(() => doCameraAction(camera, angle)))
);

// An Observable that will execute multiple camera action sequences in in parallel
const multiCameraActions$ = forkJoin(
  // the array of Observables we want to execute in parallel
  cameras.map(camera => getCameraActionSequence$(camera, cameraAngles[camera]))
);

https://stackblitz.com/edit/rxjs-gj1dny?file=index.ts