RxJS - 控制多个并行执行
RxJS - Control multiple parallel executions
我有一个场景,我必须控制多个 PTZ 摄像机,每个摄像机拍摄多个角度的照片。因此,例如:
Camera A
将取角度 A1
、A2
、A3
Camera B
将取角度 B1
、B2
、B3
、B4
将相机移动到正确的角度、拍摄图像并上传图像,这些都是 return 承诺的异步功能。
moveCamera( angle )
captureImage()
uploadImage()
摄像机必须并行操作,但每个摄像机的角度必须按顺序进行。
不知何故我觉得这可以通过 RxJS 轻松解决,但我正在努力将它们拼凑在一起。我能做的最好的就是类似于下面的解决方案,它以某种方式使相机按顺序相互处理。请注意,我使用 redux-observable
,下面的代码是我在 plain RxJS 中所能做到的最好的。请原谅我的 RxJS。
const angles = {
'Camera A': [ 'A1', 'A2', 'A3' ],
'Camera B': [ 'B1', 'B2', 'B3', 'B4' ],
}
const cameras = of( [ 'Camera A', 'Camera B' ] );
const cameraRun = cameras.pipe(
mergeMap( camera => {
// in redux-observable, I could return an array here
return of( angles[ camera ] );
} )
);
cameraRun.pipe(
concatMap( angle => {
return moveCamera( angle )
.then( () => captureImage() )
.then( () => uploadImage() )
.then( () => console.log( 'Image success' ) );
} )
)
知道redux-observable
的人,我有3个epics:
RUN_CAMERA_SET_ROUTINE
- 在 mergeMap
内运行所有相机
RUN_CAMERA_ROUTINE
- 为 mergeMap
内的每个摄像机运行所有角度
CAPTURE_IMAGE
- 在一个 concatMap
中运行上面的异步函数
我最初的想法是 CAPTURE_IMAGE
将是 "grouped" 由于 mergeMap
生成流,但我错了。似乎 CAPTURE_IMAGE
仍然是一个流,在所有摄像机的每个角度排队。
任何指点都会很有帮助。
我会试一试。我在 StackBlitz 中提出了一个解决方案来展示我的想法。在点击按钮之前点击控制台开始新的运行.
关于此解决方案的一些要点:
- 我只是用
start$
点了点鼠标就开始了新的运行,对解决方案不重要
- 我用各种超时模拟了三个摄像头承诺函数,只是为了展示事情是如何按顺序执行的,但两个摄像头是如何并行执行的。
- 我还为每个相机功能传递了一个
camera
的变量,但这只是为了 console.log() 可以清楚地显示相机在做什么。
- 我没有对
redux-observable
做任何事情,而是保留原版 rxjs
。
- 我使用
concat()
将拍摄照片转换为可观察的序列,而不是像您那样使用承诺链将其保留下来 - 这不是必需的,只是一种不同的处理方式。
- 我将相机作为单独的 Observable(
cameraA$
和 cameraB$
)保留,但这也可以通过一组相机来完成。
请随意分叉并更改它以使其更接近您正在寻找的内容。
这是 StackBlitz 中的内容:
import { mergeMap, concatMap, tap } from 'rxjs/operators';
import { fromEvent, from, concat, merge, defer } from 'rxjs';
const moveCamera = (camera, angle) => new Promise(
(resolve, reject) => {
setTimeout(() => {
console.log(`moved: ${camera} angle: ${angle}`);
resolve();
}, 1000) }
);
const captureImage = (camera) => new Promise(
(resolve, reject) => {
setTimeout(() => {
console.log(`${camera} captured image.`);
resolve();
}, 100) }
);
const uploadImage = (camera) => new Promise(
(resolve, reject) => {
setTimeout(() => {
console.log(`${camera} uploaded image.`);
resolve();
}, 2000) }
);
const start$ = fromEvent(document.getElementById('start'), 'click');
const takeAPhoto$ = (camera, angle) => concat(
defer(() => moveCamera(camera, angle)),
defer(() => captureImage(camera)),
defer(() => uploadImage(camera))
);
const cameraA$ = from(['A1', 'A2', 'A3']).pipe(
concatMap(angle => takeAPhoto$('Camera A', angle))
);
const cameraB$ = from(['B1', 'B2', 'B3', 'B4']).pipe(
concatMap(angle => takeAPhoto$('Camera B', angle))
);
start$.pipe(
tap(() => console.log('\n\nstart new run')),
mergeMap(() => merge(cameraA$, cameraB$)),
).subscribe();
希望对您有所帮助。
你的问题归结为并行执行一些 Observables 并按顺序执行其他 Observables 并从 Promises 创建 Observables。
- 要并行执行多个 Observable,请使用:
然后你必须
- 构造一个要按顺序执行的 Observable 数组。
(单摄像头每个角度的动作)
- 构造一个要并行执行的 Observable 数组。
(每个相机的相机动作)。
这可能是纯 RxJS 中的代码
import { concat, forkJoin, merge, defer } from 'rxjs';
const cameras = ['Camera A', 'Camera B'];
const cameraAngles = {
'Camera A': ['A1', 'A2', 'A3'],
'Camera B': ['B1', 'B2', 'B3', 'B4']
}
// Performs a camera action consisting of multiple parts. Returns a Promise.
// camera: e.g. 'Camera A', angle: e.g. 'A1'
const doCameraAction = (camera, angle) => moveCamera(angle)
.then(() => captureImage())
.then(() => uploadImage())
.then(() => console.log('Image success'));
// Creates an Observables that executes multiple camera actions in sequence.
// camera: e.g. 'Camera A', angles: e.g. ['A1', 'A2', 'A3']
const getCameraActionSequence$ = (camera, angles) => concat(
// the array of Observables we want to execute in sequence
...angles.map(angle => defer(() => doCameraAction(camera, angle)))
);
// An Observable that will execute multiple camera action sequences in in parallel
const multiCameraActions$ = forkJoin(
// the array of Observables we want to execute in parallel
cameras.map(camera => getCameraActionSequence$(camera, cameraAngles[camera]))
);
我有一个场景,我必须控制多个 PTZ 摄像机,每个摄像机拍摄多个角度的照片。因此,例如:
Camera A
将取角度 A1
、A2
、A3
Camera B
将取角度 B1
、B2
、B3
、B4
将相机移动到正确的角度、拍摄图像并上传图像,这些都是 return 承诺的异步功能。
moveCamera( angle )
captureImage()
uploadImage()
摄像机必须并行操作,但每个摄像机的角度必须按顺序进行。
不知何故我觉得这可以通过 RxJS 轻松解决,但我正在努力将它们拼凑在一起。我能做的最好的就是类似于下面的解决方案,它以某种方式使相机按顺序相互处理。请注意,我使用 redux-observable
,下面的代码是我在 plain RxJS 中所能做到的最好的。请原谅我的 RxJS。
const angles = {
'Camera A': [ 'A1', 'A2', 'A3' ],
'Camera B': [ 'B1', 'B2', 'B3', 'B4' ],
}
const cameras = of( [ 'Camera A', 'Camera B' ] );
const cameraRun = cameras.pipe(
mergeMap( camera => {
// in redux-observable, I could return an array here
return of( angles[ camera ] );
} )
);
cameraRun.pipe(
concatMap( angle => {
return moveCamera( angle )
.then( () => captureImage() )
.then( () => uploadImage() )
.then( () => console.log( 'Image success' ) );
} )
)
知道redux-observable
的人,我有3个epics:
RUN_CAMERA_SET_ROUTINE
- 在 mergeMap
内运行所有相机
RUN_CAMERA_ROUTINE
- 为 mergeMap
内的每个摄像机运行所有角度
CAPTURE_IMAGE
- 在一个 concatMap
我最初的想法是 CAPTURE_IMAGE
将是 "grouped" 由于 mergeMap
生成流,但我错了。似乎 CAPTURE_IMAGE
仍然是一个流,在所有摄像机的每个角度排队。
任何指点都会很有帮助。
我会试一试。我在 StackBlitz 中提出了一个解决方案来展示我的想法。在点击按钮之前点击控制台开始新的运行.
关于此解决方案的一些要点:
- 我只是用
start$
点了点鼠标就开始了新的运行,对解决方案不重要 - 我用各种超时模拟了三个摄像头承诺函数,只是为了展示事情是如何按顺序执行的,但两个摄像头是如何并行执行的。
- 我还为每个相机功能传递了一个
camera
的变量,但这只是为了 console.log() 可以清楚地显示相机在做什么。 - 我没有对
redux-observable
做任何事情,而是保留原版rxjs
。 - 我使用
concat()
将拍摄照片转换为可观察的序列,而不是像您那样使用承诺链将其保留下来 - 这不是必需的,只是一种不同的处理方式。 - 我将相机作为单独的 Observable(
cameraA$
和cameraB$
)保留,但这也可以通过一组相机来完成。
请随意分叉并更改它以使其更接近您正在寻找的内容。
这是 StackBlitz 中的内容:
import { mergeMap, concatMap, tap } from 'rxjs/operators';
import { fromEvent, from, concat, merge, defer } from 'rxjs';
const moveCamera = (camera, angle) => new Promise(
(resolve, reject) => {
setTimeout(() => {
console.log(`moved: ${camera} angle: ${angle}`);
resolve();
}, 1000) }
);
const captureImage = (camera) => new Promise(
(resolve, reject) => {
setTimeout(() => {
console.log(`${camera} captured image.`);
resolve();
}, 100) }
);
const uploadImage = (camera) => new Promise(
(resolve, reject) => {
setTimeout(() => {
console.log(`${camera} uploaded image.`);
resolve();
}, 2000) }
);
const start$ = fromEvent(document.getElementById('start'), 'click');
const takeAPhoto$ = (camera, angle) => concat(
defer(() => moveCamera(camera, angle)),
defer(() => captureImage(camera)),
defer(() => uploadImage(camera))
);
const cameraA$ = from(['A1', 'A2', 'A3']).pipe(
concatMap(angle => takeAPhoto$('Camera A', angle))
);
const cameraB$ = from(['B1', 'B2', 'B3', 'B4']).pipe(
concatMap(angle => takeAPhoto$('Camera B', angle))
);
start$.pipe(
tap(() => console.log('\n\nstart new run')),
mergeMap(() => merge(cameraA$, cameraB$)),
).subscribe();
希望对您有所帮助。
你的问题归结为并行执行一些 Observables 并按顺序执行其他 Observables 并从 Promises 创建 Observables。
- 要并行执行多个 Observable,请使用:
然后你必须
- 构造一个要按顺序执行的 Observable 数组。
(单摄像头每个角度的动作) - 构造一个要并行执行的 Observable 数组。
(每个相机的相机动作)。
这可能是纯 RxJS 中的代码
import { concat, forkJoin, merge, defer } from 'rxjs';
const cameras = ['Camera A', 'Camera B'];
const cameraAngles = {
'Camera A': ['A1', 'A2', 'A3'],
'Camera B': ['B1', 'B2', 'B3', 'B4']
}
// Performs a camera action consisting of multiple parts. Returns a Promise.
// camera: e.g. 'Camera A', angle: e.g. 'A1'
const doCameraAction = (camera, angle) => moveCamera(angle)
.then(() => captureImage())
.then(() => uploadImage())
.then(() => console.log('Image success'));
// Creates an Observables that executes multiple camera actions in sequence.
// camera: e.g. 'Camera A', angles: e.g. ['A1', 'A2', 'A3']
const getCameraActionSequence$ = (camera, angles) => concat(
// the array of Observables we want to execute in sequence
...angles.map(angle => defer(() => doCameraAction(camera, angle)))
);
// An Observable that will execute multiple camera action sequences in in parallel
const multiCameraActions$ = forkJoin(
// the array of Observables we want to execute in parallel
cameras.map(camera => getCameraActionSequence$(camera, cameraAngles[camera]))
);