使用 mergeMap 分解和转换后重建非完成的 Observable Array
Rebuilding non-completing Observable Array after decomposing with mergeMap and transforming
我有一个带有 publishReplay 和 refcount 的主题,所以它实际上永远不会完成。 refcount 的输出是一个 Observable 对象数组。我想将 Observable 对象数组分解为发出的元素,根据需要进行转换,然后将它们合并回一个数组,但是由于源是非完成的,数组边界在异步性质中是 'lost'。
举个例子,我使用mergeMap和scan来积累数据。但是,正如预期的那样,它会不断累积,因此生成的数组是流的整个历史记录。扫描的初始值仅在流的开头进行初始化。我不能使用 toArray,因为 Observable 永远不会终止。
我在异步硬件设计方面有很多经验。这个异步链的硬件类比是使用锁存器。我不确定 RxJS 中的概念等价物是什么。我假设从 refcount 获取发出的数组输出并将其应用于 Observable.from(theArrayOutput) 会起作用,但我不知道如何将它插入流链中。
import {Component, OnInit} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
type IObjectsOperation = (types: Object[]) => Object[];
@Component({
selector: 'app-events-test',
templateUrl: './events-test.component.html',
styleUrls: ['./events-test.component.css']
})
export class EventsTestComponent implements OnInit {
public objects: Observable<Object[]>;
public scanned: Observable<any>;
protected updates: Subject<any> = new Subject<any>();
protected functionStream: Subject<any> = new Subject<any>();
protected addStream: Subject<Object> = new Subject<Object>();
private initialObjects: Object[] = [1];
constructor() {
this.objects = this.updates
.scan((objects: Object[],
operation: IObjectsOperation) => {
return operation(objects);
},
this.initialObjects)
.publishReplay(1)
.refCount();
this.functionStream
.map(function (message: Object): IObjectsOperation {
return (messages: Object[]) => {
return messages.concat(message);
};
})
.subscribe(this.updates);
this.addStream.subscribe(this.functionStream);
this.scanned = this.objects
.mergeMap(val => val)
// some transformation that I want to have happen, in this case no-op
.filter(() => {return true})
// attempt to rebuild array, but items are accumulated
.scan( (acc: Array<any>, x: Object) => { return acc.concat(x); }, [])
}
transform(objects) {
return Observable.from(objects)
// this withLatestFrom suggestion didn't work
// .withLatestFrom( this.functionStream, ( val, fn ) => fn( val ) )
.filter(() => {
return true
})
.toArray()
}
start(): void {
console.log('---------STARTING');
this.objects.mergeMap(obj => this.transform(obj))
.subscribe(
obj => console.log('objects: ' + obj)
);
this.scanned.subscribe(
{
next: obj => {
console.log('scanned: ' + obj);
},
error: () => {
},
complete: () => console.log('COMPLETED')
}
);
this.add(2);
this.add(3);
}
add(object: Object): void {
this.addStream.next(object);
}
ngOnInit() {
this.start();
}
}
输出低于预期。
---------STARTING
events-test.component.ts:49 objects: 1,2
events-test.component.ts:52 scanned: 1
events-test.component.ts:52 scanned: 1,2
events-test.component.ts:49 objects: 1,2,3
events-test.component.ts:52 scanned: 1,2,1
events-test.component.ts:52 scanned: 1,2,1,2
events-test.component.ts:52 scanned: 1,2,1,2,3
我想看到的是:
---------STARTING
events-test.component.ts:49 objects: 1,2
events-test.component.ts:52 scanned: 1,2
events-test.component.ts:49 objects: 1,2,3
events-test.component.ts:52 scanned: 1,2,3
我有一些可以使用的解决方法:
- 在整个流中将数据保存为数组
- 将源非完成可观察到完成
我假设此架构的解决方案是:
- 我可以将一些操作插入到流中以从对象 Observable 中获取数组,return 一个完成的 Observable
- 有一些外部信号需要从源 Observable 发出,指示 'processing' 然后终止边界扫描。这对我来说似乎很笨拙。
我假设 RxJS 中会有一些等效于异步硬件锁存器的东西,所以我想保留当前的架构。此外,我认为 RxJS 真的很酷,并且想弥补我在基于时间的流处理方面的知识差距。
编辑:@xtianjohns 的一个很好的回答指出了如何进行内部循环,但订阅仍然没有完成。建议添加 withLatestFrom 导致转换函数中断,在注释掉该行时,数组已呈现,但外部循环未完成。如果存在该行,则不会呈现数组,并且循环不会完成。
您的问题来自使用 mergeMap
后跟 scan
,并在 "outer" 流上这样做,而不是转换您的对象列表然后发出该列表。
另一种方式:如果您想要的是数组流,那么让我们始终将其保留为数组流。
/* Function that takes array and returns stream of transformed arrays */
function transform( objects ) {
return Observable.from( objects )
/* We only need this because your transformation is wrapped in a stream. */
.withLatestFrom( this.functionStream, ( val, fn ) => fn( val ) )
// some transformation that I want to have happen, in this case no-op
.filter(() => {return true});
.toArray();
}
this.scanned = this.objects
/* let's keep this as Observable<Array<YourObject>> */
.mergeMap( transform );
/* No need to "rebuild" array, this is now a stream of arrays */
请注意在这个例子中,我 "transform" 数组的元素在 transform
中,而不是尝试 "unwrap" 流中的数组然后重组。
这符合您的用例吗?
编辑:我在阅读您的 functionStream 签名时犯了一个错误,已更正。
我有一个带有 publishReplay 和 refcount 的主题,所以它实际上永远不会完成。 refcount 的输出是一个 Observable 对象数组。我想将 Observable 对象数组分解为发出的元素,根据需要进行转换,然后将它们合并回一个数组,但是由于源是非完成的,数组边界在异步性质中是 'lost'。
举个例子,我使用mergeMap和scan来积累数据。但是,正如预期的那样,它会不断累积,因此生成的数组是流的整个历史记录。扫描的初始值仅在流的开头进行初始化。我不能使用 toArray,因为 Observable 永远不会终止。
我在异步硬件设计方面有很多经验。这个异步链的硬件类比是使用锁存器。我不确定 RxJS 中的概念等价物是什么。我假设从 refcount 获取发出的数组输出并将其应用于 Observable.from(theArrayOutput) 会起作用,但我不知道如何将它插入流链中。
import {Component, OnInit} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
type IObjectsOperation = (types: Object[]) => Object[];
@Component({
selector: 'app-events-test',
templateUrl: './events-test.component.html',
styleUrls: ['./events-test.component.css']
})
export class EventsTestComponent implements OnInit {
public objects: Observable<Object[]>;
public scanned: Observable<any>;
protected updates: Subject<any> = new Subject<any>();
protected functionStream: Subject<any> = new Subject<any>();
protected addStream: Subject<Object> = new Subject<Object>();
private initialObjects: Object[] = [1];
constructor() {
this.objects = this.updates
.scan((objects: Object[],
operation: IObjectsOperation) => {
return operation(objects);
},
this.initialObjects)
.publishReplay(1)
.refCount();
this.functionStream
.map(function (message: Object): IObjectsOperation {
return (messages: Object[]) => {
return messages.concat(message);
};
})
.subscribe(this.updates);
this.addStream.subscribe(this.functionStream);
this.scanned = this.objects
.mergeMap(val => val)
// some transformation that I want to have happen, in this case no-op
.filter(() => {return true})
// attempt to rebuild array, but items are accumulated
.scan( (acc: Array<any>, x: Object) => { return acc.concat(x); }, [])
}
transform(objects) {
return Observable.from(objects)
// this withLatestFrom suggestion didn't work
// .withLatestFrom( this.functionStream, ( val, fn ) => fn( val ) )
.filter(() => {
return true
})
.toArray()
}
start(): void {
console.log('---------STARTING');
this.objects.mergeMap(obj => this.transform(obj))
.subscribe(
obj => console.log('objects: ' + obj)
);
this.scanned.subscribe(
{
next: obj => {
console.log('scanned: ' + obj);
},
error: () => {
},
complete: () => console.log('COMPLETED')
}
);
this.add(2);
this.add(3);
}
add(object: Object): void {
this.addStream.next(object);
}
ngOnInit() {
this.start();
}
}
输出低于预期。
---------STARTING
events-test.component.ts:49 objects: 1,2
events-test.component.ts:52 scanned: 1
events-test.component.ts:52 scanned: 1,2
events-test.component.ts:49 objects: 1,2,3
events-test.component.ts:52 scanned: 1,2,1
events-test.component.ts:52 scanned: 1,2,1,2
events-test.component.ts:52 scanned: 1,2,1,2,3
我想看到的是:
---------STARTING
events-test.component.ts:49 objects: 1,2
events-test.component.ts:52 scanned: 1,2
events-test.component.ts:49 objects: 1,2,3
events-test.component.ts:52 scanned: 1,2,3
我有一些可以使用的解决方法:
- 在整个流中将数据保存为数组
- 将源非完成可观察到完成
我假设此架构的解决方案是:
- 我可以将一些操作插入到流中以从对象 Observable 中获取数组,return 一个完成的 Observable
- 有一些外部信号需要从源 Observable 发出,指示 'processing' 然后终止边界扫描。这对我来说似乎很笨拙。
我假设 RxJS 中会有一些等效于异步硬件锁存器的东西,所以我想保留当前的架构。此外,我认为 RxJS 真的很酷,并且想弥补我在基于时间的流处理方面的知识差距。
编辑:@xtianjohns 的一个很好的回答指出了如何进行内部循环,但订阅仍然没有完成。建议添加 withLatestFrom 导致转换函数中断,在注释掉该行时,数组已呈现,但外部循环未完成。如果存在该行,则不会呈现数组,并且循环不会完成。
您的问题来自使用 mergeMap
后跟 scan
,并在 "outer" 流上这样做,而不是转换您的对象列表然后发出该列表。
另一种方式:如果您想要的是数组流,那么让我们始终将其保留为数组流。
/* Function that takes array and returns stream of transformed arrays */
function transform( objects ) {
return Observable.from( objects )
/* We only need this because your transformation is wrapped in a stream. */
.withLatestFrom( this.functionStream, ( val, fn ) => fn( val ) )
// some transformation that I want to have happen, in this case no-op
.filter(() => {return true});
.toArray();
}
this.scanned = this.objects
/* let's keep this as Observable<Array<YourObject>> */
.mergeMap( transform );
/* No need to "rebuild" array, this is now a stream of arrays */
请注意在这个例子中,我 "transform" 数组的元素在 transform
中,而不是尝试 "unwrap" 流中的数组然后重组。
这符合您的用例吗?
编辑:我在阅读您的 functionStream 签名时犯了一个错误,已更正。