RxJS:可观察流通过管道传输到 groupBy(),然后是 concatMap();后续密钥的数据丢失
RxJS: Observable stream piped to groupBy() followed by concatMap(); data for subsequent keys lost
我正在尝试使用 RxJS groupBy
运算符,然后使用 concatMap
来根据一些键将记录收集到各个组中。
我注意到当 concatMap
跟在 groupBy
运算符后,它似乎丢失了第一个之后出现的所有键的数据。
例如:
考虑以下代码块:
// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1)),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
//
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3
但是,当我单独使用 concatMap
运算符时,它的行为符合预期。
// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();
const result = clicks.pipe(
concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));
// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3
阅读 RxJS 的文档 groupBy
and concatMap
does not provide me with any clues as to what could be going on here. Whereas the section on RxJS concatMap
at reactivex.io 让我相信这应该有效。
任何人都可以帮助我了解这里的第一个场景是怎么回事吗?我怎样才能让第一个场景起作用?
我终于明白问题出在哪里了。
在上述问题的场景 #1 中,代码首先将源流通过管道传输到 groupBy
运算符,然后是 concatMap
运算符。而这种运算符组合似乎导致了这个问题。
groupBy
和 mergeMap
的内部运作
通读 the code for the groupBy
operator,我意识到 groupBy
在内部为在源流中找到的每个键创建了一个新的 Subject
实例。然后,属于该键的所有值立即由该 Subject
实例发出。
所有 Subject
实例都包装到 GroupedObservale
中,并由 groupBy
运算符向下游发出。这个 GroupedObservable
实例流是 concatMap
运算符的输入。
concatMap
运算符在内部调用 mergeMap
运算符,concurrency
的值为 1,这意味着只有一个源可观察值被同时订阅。
mergeMap
运算符仅订阅一个 observable,或 conccurency
参数允许的尽可能多的 observable,并将所有其他 observable 保存在 "buffer" 中,直到第一个订阅完成。
这是如何产生问题的?
首先,既然我已经通读了这些运算符的代码,我不太确定这是否是 "problem"。
然而,我在问题中描述的行为发生了,因为虽然 groupBy
运算符立即使用相应的 Subject
实例发出单个值,但 mergeMap
运算符不会订阅那个特别是 Subject
。因此,使用 Subject
发出的源流中的所有值都将丢失。
我试图用一个粗略的弹珠图来说明这个问题:
这不是 "problem" 这些运算符的工作方式,但可能与我理解这些运算符的方式以及可能的文档(特别是 concatMap
的文档可能有点混乱RxJS 新手)。
这可以通过让 groupBy
运算符使用 ReplaySubject
而不是 Subject
来发出分组值来轻松解决。 groupBy
接受一个 subjectSelector
参数,允许我们用 ReplaySubject
实例切换 Subject
实例。
以下代码有效:
// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();
// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
为什么方案 2 有效?
我的问题中的场景 2 工作正常,因为 interval
只是创建了一个 Observable 但没有开始发射值。因此,当 mergeMap
最终订阅它时,该 Observable 的所有值都可用。
我的回答是对 Kiran 的补充,并指出如果您使用异步 mergeMap,您将遇到与问题中描述的完全相同的问题。
当您使用 groupBy
时,正如 Kiren 解释的那样,它会在内部创建一个 Subject
立即订阅源。以下作品...
source.pipe(
groupBy(item => item.id),
mergeMap(byId => {
return byId.pipe(map(x=>service.put(x)));
}),
... 因为(据我所知)订阅是同步的 - mergeMap 会立即订阅每个新分组(假设没有并发限制),因此它会捕获数据。
如果您想按分组异步执行某些操作,您可以尝试...
source.pipe(
groupBy(item => item.id),
mergeMap(async byId => {
let service = await this.getSomething(byId.key);
return byId.pipe(map(x=>service.put(x)));
}),
mergeAll()
...此时对分组 Observable 的订阅被推迟到 mergeAll
并且它将 错过 初始数据。
解决方案正如 Kiran 所说:您必须使用缓冲主题,以便在最终订阅该组时也可以重播这些值:
groupBy(item => item.id, null, null,()=>new ReplaySubject())
会很好地工作。
我的个人解决方案是创建一个自定义 BufferSubject
仅在第一次订阅之前缓冲的自定义 在初始订阅后不希望任何缓冲,然后简单地通过 next
传递给基础 Subject。
/** buffers items until the first subscription, then replays them and stops buffering */
export class BufferSubject<T> extends Subject<T>{
private _events: T[] = [];
constructor(private scheduler?: SchedulerLike) {
super();
}
next(value: T) {
this._events.push(value);
super.next(value);
}
_subscribe(subscriber: Subscriber<T>): Subscription {
const _events = this._events;
//stop buffering
this.next = super.next;
this._events = null;
const scheduler = this.scheduler;
const len = _events.length;
let subscription: Subscription;
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.isStopped || this.hasError) {
subscription = Subscription.EMPTY;
} else {
this.observers.push(subscriber);
subscription = new SubjectSubscription(this, subscriber);
}
if (scheduler) {
subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler));
}
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(_events[i]);
}
if (this.hasError) {
subscriber.error(this.thrownError);
} else if (this.isStopped) {
subscriber.complete();
}
return subscription;
}
}
/** from rxjs internals */
export class SubjectSubscription<T> extends Subscription {
closed: boolean = false;
constructor(public subject: Subject<T>, public subscriber: Observer<T>) {
super();
}
unsubscribe() {
if (this.closed) {
return;
}
this.closed = true;
const subject = this.subject;
const observers = subject.observers;
this.subject = null;
if (!observers || observers.length === 0 || subject.isStopped || subject.closed) {
return;
}
const subscriberIndex = observers.indexOf(this.subscriber);
if (subscriberIndex !== -1) {
observers.splice(subscriberIndex, 1);
}
}
}
并代替回放使用:
groupBy(item => item.id, null, null,()=>new BufferSubject())
我正在尝试使用 RxJS groupBy
运算符,然后使用 concatMap
来根据一些键将记录收集到各个组中。
我注意到当 concatMap
跟在 groupBy
运算符后,它似乎丢失了第一个之后出现的所有键的数据。
例如:
考虑以下代码块:
// DOES NOT WORK
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1)),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// Expected Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
//
// Actual Output:
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// ...Nothing more -- no results for key 2 and 3
但是,当我单独使用 concatMap
运算符时,它的行为符合预期。
// WORKS
const records = ['a', 'b', 'c', 'd', 'e', 'f', 'g'];
const clicks = new Subject();
const result = clicks.pipe(
concatMap(ev => ev.subject$.pipe(take(4), map(x => ev.key + x))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next({key: x, subject$: interval(1000)}));
// Expected & Actual Output:
// a0
// a1
// a2
// a3
// b0
// b1
// b2
// b3
// c0
// c1
// c2
// c3
// d0
// d1
// d2
// d3
// e0
// e1
// e2
// e3
// f0
// f1
// f2
// f3
// g0
// g1
// g2
// g3
阅读 RxJS 的文档 groupBy
and concatMap
does not provide me with any clues as to what could be going on here. Whereas the section on RxJS concatMap
at reactivex.io 让我相信这应该有效。
任何人都可以帮助我了解这里的第一个场景是怎么回事吗?我怎样才能让第一个场景起作用?
我终于明白问题出在哪里了。
在上述问题的场景 #1 中,代码首先将源流通过管道传输到 groupBy
运算符,然后是 concatMap
运算符。而这种运算符组合似乎导致了这个问题。
groupBy
和 mergeMap
的内部运作
通读 the code for the groupBy
operator,我意识到 groupBy
在内部为在源流中找到的每个键创建了一个新的 Subject
实例。然后,属于该键的所有值立即由该 Subject
实例发出。
所有 Subject
实例都包装到 GroupedObservale
中,并由 groupBy
运算符向下游发出。这个 GroupedObservable
实例流是 concatMap
运算符的输入。
concatMap
运算符在内部调用 mergeMap
运算符,concurrency
的值为 1,这意味着只有一个源可观察值被同时订阅。
mergeMap
运算符仅订阅一个 observable,或 conccurency
参数允许的尽可能多的 observable,并将所有其他 observable 保存在 "buffer" 中,直到第一个订阅完成。
这是如何产生问题的?
首先,既然我已经通读了这些运算符的代码,我不太确定这是否是 "problem"。
然而,我在问题中描述的行为发生了,因为虽然 groupBy
运算符立即使用相应的 Subject
实例发出单个值,但 mergeMap
运算符不会订阅那个特别是 Subject
。因此,使用 Subject
发出的源流中的所有值都将丢失。
我试图用一个粗略的弹珠图来说明这个问题:
这不是 "problem" 这些运算符的工作方式,但可能与我理解这些运算符的方式以及可能的文档(特别是 concatMap
的文档可能有点混乱RxJS 新手)。
这可以通过让 groupBy
运算符使用 ReplaySubject
而不是 Subject
来发出分组值来轻松解决。 groupBy
接受一个 subjectSelector
参数,允许我们用 ReplaySubject
实例切换 Subject
实例。
以下代码有效:
// THIS VERSION WORKS
const records = ['a:1', 'b:2', 'c:3', 'd:1', 'e:2', 'f:3', 'g:1'];
const clicks = new Subject();
const result = clicks.pipe(
groupBy(x => x.substr(2,1), null, null, () => new ReplaySubject()),
concatMap(ev$ => ev$.pipe(map(x => ({key: ev$.key, value: x})))),
);
const subscription = result.subscribe(x => console.log(x));
records.forEach(x => clicks.next(x));
// We also need to explicity complete() the source
// stream to ensure that the observable stream for
// the first GroupedObservable completes allowing
// the concatMap operator to move to the second
// GroupedObservable.
clicks.complete();
// Expected and Actual output
// { key: '1', value: 'a:1' }
// { key: '1', value: 'd:1' }
// { key: '1', value: 'g:1' }
// { key: '2', value: 'b:2' }
// { key: '2', value: 'e:2' }
// { key: '3', value: 'c:3' }
// { key: '3', value: 'f:3' }
为什么方案 2 有效?
我的问题中的场景 2 工作正常,因为 interval
只是创建了一个 Observable 但没有开始发射值。因此,当 mergeMap
最终订阅它时,该 Observable 的所有值都可用。
我的回答是对 Kiran 的补充,并指出如果您使用异步 mergeMap,您将遇到与问题中描述的完全相同的问题。
当您使用 groupBy
时,正如 Kiren 解释的那样,它会在内部创建一个 Subject
立即订阅源。以下作品...
source.pipe(
groupBy(item => item.id),
mergeMap(byId => {
return byId.pipe(map(x=>service.put(x)));
}),
... 因为(据我所知)订阅是同步的 - mergeMap 会立即订阅每个新分组(假设没有并发限制),因此它会捕获数据。
如果您想按分组异步执行某些操作,您可以尝试...
source.pipe(
groupBy(item => item.id),
mergeMap(async byId => {
let service = await this.getSomething(byId.key);
return byId.pipe(map(x=>service.put(x)));
}),
mergeAll()
...此时对分组 Observable 的订阅被推迟到 mergeAll
并且它将 错过 初始数据。
解决方案正如 Kiran 所说:您必须使用缓冲主题,以便在最终订阅该组时也可以重播这些值:
groupBy(item => item.id, null, null,()=>new ReplaySubject())
会很好地工作。
我的个人解决方案是创建一个自定义 BufferSubject
仅在第一次订阅之前缓冲的自定义 在初始订阅后不希望任何缓冲,然后简单地通过 next
传递给基础 Subject。
/** buffers items until the first subscription, then replays them and stops buffering */
export class BufferSubject<T> extends Subject<T>{
private _events: T[] = [];
constructor(private scheduler?: SchedulerLike) {
super();
}
next(value: T) {
this._events.push(value);
super.next(value);
}
_subscribe(subscriber: Subscriber<T>): Subscription {
const _events = this._events;
//stop buffering
this.next = super.next;
this._events = null;
const scheduler = this.scheduler;
const len = _events.length;
let subscription: Subscription;
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.isStopped || this.hasError) {
subscription = Subscription.EMPTY;
} else {
this.observers.push(subscriber);
subscription = new SubjectSubscription(this, subscriber);
}
if (scheduler) {
subscriber.add(subscriber = new ObserveOnSubscriber<T>(subscriber, scheduler));
}
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(_events[i]);
}
if (this.hasError) {
subscriber.error(this.thrownError);
} else if (this.isStopped) {
subscriber.complete();
}
return subscription;
}
}
/** from rxjs internals */
export class SubjectSubscription<T> extends Subscription {
closed: boolean = false;
constructor(public subject: Subject<T>, public subscriber: Observer<T>) {
super();
}
unsubscribe() {
if (this.closed) {
return;
}
this.closed = true;
const subject = this.subject;
const observers = subject.observers;
this.subject = null;
if (!observers || observers.length === 0 || subject.isStopped || subject.closed) {
return;
}
const subscriberIndex = observers.indexOf(this.subscriber);
if (subscriberIndex !== -1) {
observers.splice(subscriberIndex, 1);
}
}
}
并代替回放使用:
groupBy(item => item.id, null, null,()=>new BufferSubject())