在 `toArray` 之前发出动作 - Redux Observable
Emit actions before `toArray` - Redux Observable
我正在使用 Redux Observable,需要解决从 epics 触发操作时的计时问题。
我有一组要循环的项目,以便对每个项目进行 AJAX 调用。收到 AJAX 响应后,我想立即执行一些操作。在对原始数组中的每个项目返回所有 AJAX 响应后,我想触发更多操作。
如何在 timer
过期后立即触发这些操作,即使原始数组尚未完成循环?
const someEpic = action$ => (
action$
.pipe(
ofType(SOME_ACTION),
switchMap(({ payload }) => (
from(payload) // This is an array of items
.pipe(
mergeMap(() => (
timer(5000) // This is my AJAX call
.pipe(
map(loadedAction),
)
)),
toArray(),
map(anotherAction),
)
))
)
)
可能最简单的方法是在任何您想要的地方实际使用 tap
发出动作。这是假设您有权访问商店。例如:
tap(result => this.store.dispatch(...))
然而,更 "Rx" 的方法是使用 multicast
拆分链,然后立即重新发送一部分(即加载进度),另一半与 toArray()
链接以收集然后所有结果都会变成另一个动作,表明加载已完成。
import { range, Subject, of } from 'rxjs';
import { multicast, delay, merge, concatMap, map, toArray } from 'rxjs/operators';
const process = v => of(v).pipe(
delay(1000),
map(p => `processed: ${p}`),
);
range(1, 5)
.pipe(
concatMap(v => process(v)),
multicast(
() => new Subject(),
s => s.pipe(
merge(s.pipe(toArray()))
)
),
)
.subscribe(console.log);
现场演示:https://stackblitz.com/edit/rxjs6-demo-k9hwtu?file=index.ts
这需要 2 个 epics。
const ajaxCallerEpic = action$ => (
action$
.pipe(
ofType(AJAX_ACTION),
switchMap(({
payload,
payloadId,
}) => (
merge(
from(payload) // This is an array of items
.pipe(
mergeMap(() => (
timer(5000) // This is my AJAX call
.pipe(
switchMap(() => (
merge(
of(loadedAction),
of(
sentData(
payloadId
)
),
)
)),
)
)),
),
)
))
)
)
const ajaxResponsesEpic = action$ => (
action$
.pipe(
ofType(AJAX_ACTION),
switchMap(({
payload,
payloadId,
}) => (
action$
.pipe(
ofType(SENT_DATA_ACTION),
filter(({ id }) => (
id === payloadId
)),
bufferCount(
payload
.length
),
map(anotherAction),
)
))
)
)
重要的是第二个SENT_DATA_ACTION
。我在调用时传递了一个唯一 ID,以确保您正在收听正确的内容。如果您不发送所有这些,只要浏览器打开,它就会一直监听。您始终可以在内部侦听器上添加超时以确保它完成。另一个问题是,如果 ajaxResponsesEpic
设置 action$
侦听器的时间晚于 ajaxCallerEpic
运行 AJAX 调用的时间。
它很可能最终成为 race
条件。为了解决这些问题,您需要先执行 ajaxResponsesEpic
以便设置动作侦听器,同时在侦听后启动 AJAX 调用。
像这样:
const ajaxCallerEpic = action$ => (
action$
.pipe(
ofType(AJAX_READY_ACTION),
switchMap(({
payload,
payloadId,
}) => (
merge(
from(payload) // This is an array of items
.pipe(
mergeMap(() => (
timer(5000) // This is my AJAX call
.pipe(
switchMap(() => (
merge(
of(loadedAction),
of(
sentAjaxData(
payloadId
)
),
)
)),
)
)),
),
)
))
)
)
const ajaxResponsesEpic = action$ => (
action$
.pipe(
ofType(AJAX_ACTION),
switchMap(({
payload,
payloadId,
}) => (
merge(
(
action$
.pipe(
ofType(SENT_AJAX_DATA_ACTION),
filter(({ id }) => (
id === payloadId
)),
bufferCount(
payload
.length
),
map(anotherAction),
)
),
(
of(ajaxReadyAction)
),
)
))
)
)
我正在使用 Redux Observable,需要解决从 epics 触发操作时的计时问题。
我有一组要循环的项目,以便对每个项目进行 AJAX 调用。收到 AJAX 响应后,我想立即执行一些操作。在对原始数组中的每个项目返回所有 AJAX 响应后,我想触发更多操作。
如何在 timer
过期后立即触发这些操作,即使原始数组尚未完成循环?
const someEpic = action$ => (
action$
.pipe(
ofType(SOME_ACTION),
switchMap(({ payload }) => (
from(payload) // This is an array of items
.pipe(
mergeMap(() => (
timer(5000) // This is my AJAX call
.pipe(
map(loadedAction),
)
)),
toArray(),
map(anotherAction),
)
))
)
)
可能最简单的方法是在任何您想要的地方实际使用 tap
发出动作。这是假设您有权访问商店。例如:
tap(result => this.store.dispatch(...))
然而,更 "Rx" 的方法是使用 multicast
拆分链,然后立即重新发送一部分(即加载进度),另一半与 toArray()
链接以收集然后所有结果都会变成另一个动作,表明加载已完成。
import { range, Subject, of } from 'rxjs';
import { multicast, delay, merge, concatMap, map, toArray } from 'rxjs/operators';
const process = v => of(v).pipe(
delay(1000),
map(p => `processed: ${p}`),
);
range(1, 5)
.pipe(
concatMap(v => process(v)),
multicast(
() => new Subject(),
s => s.pipe(
merge(s.pipe(toArray()))
)
),
)
.subscribe(console.log);
现场演示:https://stackblitz.com/edit/rxjs6-demo-k9hwtu?file=index.ts
这需要 2 个 epics。
const ajaxCallerEpic = action$ => (
action$
.pipe(
ofType(AJAX_ACTION),
switchMap(({
payload,
payloadId,
}) => (
merge(
from(payload) // This is an array of items
.pipe(
mergeMap(() => (
timer(5000) // This is my AJAX call
.pipe(
switchMap(() => (
merge(
of(loadedAction),
of(
sentData(
payloadId
)
),
)
)),
)
)),
),
)
))
)
)
const ajaxResponsesEpic = action$ => (
action$
.pipe(
ofType(AJAX_ACTION),
switchMap(({
payload,
payloadId,
}) => (
action$
.pipe(
ofType(SENT_DATA_ACTION),
filter(({ id }) => (
id === payloadId
)),
bufferCount(
payload
.length
),
map(anotherAction),
)
))
)
)
重要的是第二个SENT_DATA_ACTION
。我在调用时传递了一个唯一 ID,以确保您正在收听正确的内容。如果您不发送所有这些,只要浏览器打开,它就会一直监听。您始终可以在内部侦听器上添加超时以确保它完成。另一个问题是,如果 ajaxResponsesEpic
设置 action$
侦听器的时间晚于 ajaxCallerEpic
运行 AJAX 调用的时间。
它很可能最终成为 race
条件。为了解决这些问题,您需要先执行 ajaxResponsesEpic
以便设置动作侦听器,同时在侦听后启动 AJAX 调用。
像这样:
const ajaxCallerEpic = action$ => (
action$
.pipe(
ofType(AJAX_READY_ACTION),
switchMap(({
payload,
payloadId,
}) => (
merge(
from(payload) // This is an array of items
.pipe(
mergeMap(() => (
timer(5000) // This is my AJAX call
.pipe(
switchMap(() => (
merge(
of(loadedAction),
of(
sentAjaxData(
payloadId
)
),
)
)),
)
)),
),
)
))
)
)
const ajaxResponsesEpic = action$ => (
action$
.pipe(
ofType(AJAX_ACTION),
switchMap(({
payload,
payloadId,
}) => (
merge(
(
action$
.pipe(
ofType(SENT_AJAX_DATA_ACTION),
filter(({ id }) => (
id === payloadId
)),
bufferCount(
payload
.length
),
map(anotherAction),
)
),
(
of(ajaxReadyAction)
),
)
))
)
)