在 RxJs 中,如何在按顺序返回结果数组的同时在 mergemap 中执行序列?
In RxJs, How do I execute a sequence inside a mergemap while returning the array of results in order?
我有一个函数可以将一条消息分解成多个消息块。为了我的 post 功能,我需要对这些消息进行 posted。但是我不希望 Observable 阻止其他 post 传入。我的解决方案是合并映射中的 concat 运算符的某种组合,但我似乎无法使其正常工作
我不确定我是否可以制作图表,但这是我的尝试:
-1-2------3|->
--4--5--6|->
desired output:
[4,5,6]
[1,2,3]
我需要请求 1 在 2 之前、3 之前和 4 在 5 之前和 6 之前执行。
在英语中,我想我会有一个可观察的可观察对象,我希望它映射到可观察的流中,然后为每个可观察的输出流映射到一个标准数组。我只是不确定该怎么做。很长一段时间以来,我一直在弄乱代码,试图将我刚才所说的内容概念化,这是我最好的尝试:
interface SendInfo {
message: discord.Message
content: string
options?: discord.MessageOptions
}
export const sendMessage$: Subject<SendInfo> = new Subject();
const regex = /[\s\S]{1,1980}(?:\n|$)/g;
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | discord.Message[] | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const superObservable: Observable<Observable<discord.Message | discord.Message[] | null>> = concat(chunks.map(
(chunk: string):
Observable<discord.Message | discord.Message[] | null> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
);
}
));
return superObservable.pipe(
mergeMap(e => e),
toArray(),
);
}
),
tap((e): void => Utils.logger.fatal(e)),
share(),
);
我的输出:
[2019-10-21T17:24:15.322] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg1' } ]
[2019-10-21T17:24:15.324] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg2' } ]
[2019-10-21T17:24:15.325] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg3' } ]
我觉得我快找到解决方案了,但我不知道如何将它准确地合并到一个数组中。我也不知道它在功能上是否正确。
我之前在这个问题中解决过保存顺序的问题
因此,使用我的 parallelExecute,您可以将值缩减为数组。
parallelExecute(...yourObservables).pipe(
reduce((results, item) => [...results, item], [])
);
这里是 parallelExecute 函数。
const { BehaviorSubject, Subject } = rxjs;
const { filter } = rxjs.operators;
const parallelExecute = (...obs$) => {
const subjects = obs$.map(o$ => {
const subject$ = new BehaviorSubject();
const sub = o$.subscribe(o => { subject$.next(o); });
return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
});
const subject$ = new Subject();
sub(0);
function sub(index) {
const current = subjects[index];
current.obs$.subscribe(c => {
subject$.next(c);
current.obs$.complete();
current.sub.unsubscribe();
if (index < subjects.length -1) {
sub(index + 1);
} else {
subject$.complete();
}
});
}
return subject$;
}
我发现我正在寻找的运算符是 combineAll() 而不是 toArray() 在 concat 语句之后。我还有另一个实现以及承诺。现在我相信这两个都应该有效,但我会 post 我更确定第一个是承诺。
使用 promises 的实现一:
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
(chunk: string):
Observable<(discord.Message | null)[]> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
// eslint-disable-next-line max-len
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
).pipe(
// eslint-disable-next-line comma-dangle
map((x): (discord.Message | null)[] => [x].flatMap(
(t): (discord.Message | discord.Message[] | null) => t
))
);
}
);
const promises = observables
.map(
(obs: Observable<(discord.Message | null)[]>):
Promise<(discord.Message | null)[]> => obs.toPromise()
);
const reduced = promises
.reduce(async (promiseChain, currentTask):
Promise<(discord.Message | null)[]> => [
...await promiseChain,
...await currentTask,
].flatMap((x): (discord.Message | null) => x));
return from(reduced);
}
),
share(),
);
实现两个纯 RxJ:
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
(chunk: string):
Observable<(discord.Message | null)[]> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
// eslint-disable-next-line max-len
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
).pipe(
// eslint-disable-next-line comma-dangle
map((x): (discord.Message | null)[] => [x].flatMap(
(t): (discord.Message | discord.Message[] | null) => t
))
);
}
);
return concat(observables).pipe(
combineAll(),
map(x => x.flatMap(t => t)),
);
}
),
share(),
);
我相信 promises 会起作用,因为它被缩减为一个显式链。我不确定我是否遗漏了 concat 运算符的一些细微差别,所以我不能 100% 确定它会起作用。由于误解了 promise 在 RxJs 中使用 defer 运算符执行的方式,我编写的测试平台实际上无法正常工作,但根据我的测试平台,我得到了预期的顺序。我相信我的误解是我没有轻易想出这些解决方案的原因。
[2019-10-23T06:09:13.948] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg7' },
{ channel: { send: [Function] }, content: 'msg8' },
{ channel: { send: [Function] }, content: 'msg9' }
]
[2019-10-23T06:09:14.243] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg4' },
{ channel: { send: [Function] }, content: 'msg5' },
{ channel: { send: [Function] }, content: 'msg6' }
]
[2019-10-23T06:09:14.640] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg1' },
{ channel: { send: [Function] }, content: 'msg2' },
{ channel: { send: [Function] }, content: 'msg3' }
]
✓ should execute concurrently. (753ms)
我有一个函数可以将一条消息分解成多个消息块。为了我的 post 功能,我需要对这些消息进行 posted。但是我不希望 Observable 阻止其他 post 传入。我的解决方案是合并映射中的 concat 运算符的某种组合,但我似乎无法使其正常工作
我不确定我是否可以制作图表,但这是我的尝试:
-1-2------3|->
--4--5--6|->
desired output:
[4,5,6]
[1,2,3]
我需要请求 1 在 2 之前、3 之前和 4 在 5 之前和 6 之前执行。 在英语中,我想我会有一个可观察的可观察对象,我希望它映射到可观察的流中,然后为每个可观察的输出流映射到一个标准数组。我只是不确定该怎么做。很长一段时间以来,我一直在弄乱代码,试图将我刚才所说的内容概念化,这是我最好的尝试:
interface SendInfo {
message: discord.Message
content: string
options?: discord.MessageOptions
}
export const sendMessage$: Subject<SendInfo> = new Subject();
const regex = /[\s\S]{1,1980}(?:\n|$)/g;
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | discord.Message[] | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const superObservable: Observable<Observable<discord.Message | discord.Message[] | null>> = concat(chunks.map(
(chunk: string):
Observable<discord.Message | discord.Message[] | null> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
);
}
));
return superObservable.pipe(
mergeMap(e => e),
toArray(),
);
}
),
tap((e): void => Utils.logger.fatal(e)),
share(),
);
我的输出:
[2019-10-21T17:24:15.322] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg1' } ]
[2019-10-21T17:24:15.324] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg2' } ]
[2019-10-21T17:24:15.325] [FATAL] messageWrapper.ts:72 - [ { channel: { send: [Function] }, content: 'msg3' } ]
我觉得我快找到解决方案了,但我不知道如何将它准确地合并到一个数组中。我也不知道它在功能上是否正确。
我之前在这个问题中解决过保存顺序的问题
因此,使用我的 parallelExecute,您可以将值缩减为数组。
parallelExecute(...yourObservables).pipe(
reduce((results, item) => [...results, item], [])
);
这里是 parallelExecute 函数。
const { BehaviorSubject, Subject } = rxjs;
const { filter } = rxjs.operators;
const parallelExecute = (...obs$) => {
const subjects = obs$.map(o$ => {
const subject$ = new BehaviorSubject();
const sub = o$.subscribe(o => { subject$.next(o); });
return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
});
const subject$ = new Subject();
sub(0);
function sub(index) {
const current = subjects[index];
current.obs$.subscribe(c => {
subject$.next(c);
current.obs$.complete();
current.sub.unsubscribe();
if (index < subjects.length -1) {
sub(index + 1);
} else {
subject$.complete();
}
});
}
return subject$;
}
我发现我正在寻找的运算符是 combineAll() 而不是 toArray() 在 concat 语句之后。我还有另一个实现以及承诺。现在我相信这两个都应该有效,但我会 post 我更确定第一个是承诺。
使用 promises 的实现一:
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
(chunk: string):
Observable<(discord.Message | null)[]> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
// eslint-disable-next-line max-len
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
).pipe(
// eslint-disable-next-line comma-dangle
map((x): (discord.Message | null)[] => [x].flatMap(
(t): (discord.Message | discord.Message[] | null) => t
))
);
}
);
const promises = observables
.map(
(obs: Observable<(discord.Message | null)[]>):
Promise<(discord.Message | null)[]> => obs.toPromise()
);
const reduced = promises
.reduce(async (promiseChain, currentTask):
Promise<(discord.Message | null)[]> => [
...await promiseChain,
...await currentTask,
].flatMap((x): (discord.Message | null) => x));
return from(reduced);
}
),
share(),
);
实现两个纯 RxJ:
export const sentMessages$ = sendMessage$.pipe(
mergeMap(
(input: SendInfo):
Observable<(discord.Message | null)[]> => {
const chunks: string[] = input.content.match(regex) || [];
const observables: Observable<(discord.Message | null)[]>[] = chunks.map(
(chunk: string):
Observable<(discord.Message | null)[]> => {
const bound = input.message.channel.send.bind(
undefined,
chunk,
input.options,
);
// eslint-disable-next-line max-len
return Network.genericNetworkObservable<discord.Message | discord.Message[]>(
bound,
).pipe(
// eslint-disable-next-line comma-dangle
map((x): (discord.Message | null)[] => [x].flatMap(
(t): (discord.Message | discord.Message[] | null) => t
))
);
}
);
return concat(observables).pipe(
combineAll(),
map(x => x.flatMap(t => t)),
);
}
),
share(),
);
我相信 promises 会起作用,因为它被缩减为一个显式链。我不确定我是否遗漏了 concat 运算符的一些细微差别,所以我不能 100% 确定它会起作用。由于误解了 promise 在 RxJs 中使用 defer 运算符执行的方式,我编写的测试平台实际上无法正常工作,但根据我的测试平台,我得到了预期的顺序。我相信我的误解是我没有轻易想出这些解决方案的原因。
[2019-10-23T06:09:13.948] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg7' },
{ channel: { send: [Function] }, content: 'msg8' },
{ channel: { send: [Function] }, content: 'msg9' }
]
[2019-10-23T06:09:14.243] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg4' },
{ channel: { send: [Function] }, content: 'msg5' },
{ channel: { send: [Function] }, content: 'msg6' }
]
[2019-10-23T06:09:14.640] [FATAL] messageWrapper.ts:109 - [
{ channel: { send: [Function] }, content: 'msg1' },
{ channel: { send: [Function] }, content: 'msg2' },
{ channel: { send: [Function] }, content: 'msg3' }
]
✓ should execute concurrently. (753ms)