RxJs - 如何让 observable 表现得像队列
RxJs - how to make observable behave like queue
我正在努力实现下一个目标:
private beginTransaction(): Observable() {
..
}
private test(): void {
this.beginTransaction().subscribe((): void => {
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
this.commitTransaction();
});
}
beginTransaction 可以并发调用,但应该延迟 observable 直到第一个或只有一个 beginTransaction 完成。
换句话说:任何时候只能进行一笔交易。
我尝试了什么:
private transactionInProgress: boolean = false;
private canBeginTransaction: Subject<void> = new Subject<void>();
private bla3(): void {
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 1');
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 2');
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 3');
this.commitTransaction();
});
}
private commitTransaction(): void {
this.transactionInProgress = false;
this.canBeginTransaction.next();
}
private beginTransaction(): Observable<void> {
if(this.transactionInProgress) {
return of(undefined)
.pipe(
skipUntil(this.canBeginTransaction),
tap((): void => {
console.log('begin transaction');
})
);
}
this.transactionInProgress = true;
return of(undefined);
}
我不确定我是否理解了问题,但在我看来 concatMap
是您正在寻找的运算符。
示例如下
const transactionTriggers$ = from([
't1', 't2', 't3'
])
function processTransation(trigger: string) {
console.log(`Start processing transation triggered by ${trigger}`)
// do whatever needs to be done and then return an Observable
console.log(`Transation triggered by ${trigger} processing ......`)
return of(`Transation triggered by ${trigger} processed`)
}
transactionTriggers$.pipe(
concatMap(trigger => processTransation(trigger)),
tap(console.log)
).subscribe()
您基本上从事件流开始,其中每个事件都应该触发交易处理。
然后你使用processTransaction
函数来做你必须做的任何事情来处理交易。 processTransactio
需要 return 一个 Observable,它在事务处理完成后发出处理结果。
然后在管道中,如果需要,您可以使用 tap
对处理结果做进一步的处理。
您可以试试this stackblitz中的代码。
你问的问题很笼统。毫无疑问,更受限制的场景可能看起来要简单得多。
无论如何,我在这里创建了一个管道,一次只允许 transaction(): Observable
订阅一次。
这可能是这样的:
/****
* Represents what each transaction does. Isn't concerned about
* order/timing/'transactionInProgress' or anything like that.
*
* Here is a fake transaction that just takes 3-5 seconds to emit
* the string: `Hello ${name}`
****/
function transaction(args): Observable<string> {
const name = args?.message;
const duration = 3000 + (Math.random() * 2000);
return of("Hello").pipe(
tap(_ => console.log("starting transaction")),
switchMap(v => timer(duration).pipe(
map(_ => `${v} ${name}`)
)),
tap(_ => console.log("Ending transation"))
);
}
// Track transactions
let currentTransactionId = 0;
// Start transactions
const transactionSubj = new Subject<any>();
// Perform transaction: concatMap ensures we only start a new one if
// there isn't a current transaction underway
const transaction$ = transactionSubj.pipe(
concatMap(({id, args}) => transaction(args).pipe(
map(payload => ({id, payload}))
)),
shareReplay(1)
);
/****
* Begin a new transaction, we give it an ID since transactions are
* "hot" and we don't want to return the wrong (earlier) transactions,
* just the current one started with this call.
****/
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
transactionSubj.next({id: currentId, args});
return transaction$.pipe(
first(({id}) => id === currentId),
map(({payload}) => payload)
);
})
}
// Queue up 3 transactions, each one will wait for the previous
// one to complete before it will begin.
beginTransaction({message: "Dave"}).subscribe(console.log);
beginTransaction({message: "Tom"}).subscribe(console.log);
beginTransaction({message: "Tim"}).subscribe(console.log);
异步事务
当前设置要求交易是异步的,否则您可能会失去第一个交易。解决方法并不简单,因此我构建了一个订阅运算符,然后尽快调用一个函数。
这里是:
function initialize<T>(fn: () => void): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),
error: bindOn("error"),
complete: bindOn("complete")
});
fn();
return {
unsubscribe: () => sub.unsubscribe
};
});
}
这里正在使用中:
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId, args})),
first(({id}) => id === currentId),
map(({payload}) => payload)
);
})
}
旁白:为什么要使用 defer
?
考虑重新编写 beginTransaction:
function beginTransaction(args): Observable<any> {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId, args})),
first(({id}) => id === currentId),
map(({payload}) => payload)
);
}
在这种情况下,ID 是在您调用 beginTransaction
时设置的。
// The ID is set here, but it won't be used until subscribed
const preppedTransaction = beginTransaction({message: "Dave"});
// 10 seconds later, that ID gets used.
setTimeout(
() => preppedTransaction.subscribe(console.log),
10000
);
如果在没有初始化运算符的情况下调用 transactionSubj.next
,那么这个问题会变得更糟,因为 transactionSubj.next
也会在订阅 observable 之前 10 秒被调用(你肯定会错过输出)
问题继续:
如果你想订阅同一个 observable 两次怎么办?
const preppedTransaction = beginTransaction({message: "Dave"});
preppedTransaction.subscribe(
value => console.log("First Subscribe: ", value)
);
preppedTransaction.subscribe(
value => console.log("Second Subscribe: ", value)
);
我希望输出为:
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
相反,您得到
First Subscribe: Hello Dave
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
Second Subscribe: Hello Dave
因为您没有在订阅时获得新的ID,所以两个订阅共享一个ID。 defer
通过在订阅之前不分配 ID 来解决此问题。这在管理流中的错误时变得非常重要(让您在出错后重试 observable)。
我正在努力实现下一个目标:
private beginTransaction(): Observable() {
..
}
private test(): void {
this.beginTransaction().subscribe((): void => {
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
this.commitTransaction();
});
}
beginTransaction 可以并发调用,但应该延迟 observable 直到第一个或只有一个 beginTransaction 完成。
换句话说:任何时候只能进行一笔交易。
我尝试了什么:
private transactionInProgress: boolean = false;
private canBeginTransaction: Subject<void> = new Subject<void>();
private bla3(): void {
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 1');
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 2');
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 3');
this.commitTransaction();
});
}
private commitTransaction(): void {
this.transactionInProgress = false;
this.canBeginTransaction.next();
}
private beginTransaction(): Observable<void> {
if(this.transactionInProgress) {
return of(undefined)
.pipe(
skipUntil(this.canBeginTransaction),
tap((): void => {
console.log('begin transaction');
})
);
}
this.transactionInProgress = true;
return of(undefined);
}
我不确定我是否理解了问题,但在我看来 concatMap
是您正在寻找的运算符。
示例如下
const transactionTriggers$ = from([
't1', 't2', 't3'
])
function processTransation(trigger: string) {
console.log(`Start processing transation triggered by ${trigger}`)
// do whatever needs to be done and then return an Observable
console.log(`Transation triggered by ${trigger} processing ......`)
return of(`Transation triggered by ${trigger} processed`)
}
transactionTriggers$.pipe(
concatMap(trigger => processTransation(trigger)),
tap(console.log)
).subscribe()
您基本上从事件流开始,其中每个事件都应该触发交易处理。
然后你使用processTransaction
函数来做你必须做的任何事情来处理交易。 processTransactio
需要 return 一个 Observable,它在事务处理完成后发出处理结果。
然后在管道中,如果需要,您可以使用 tap
对处理结果做进一步的处理。
您可以试试this stackblitz中的代码。
你问的问题很笼统。毫无疑问,更受限制的场景可能看起来要简单得多。
无论如何,我在这里创建了一个管道,一次只允许 transaction(): Observable
订阅一次。
这可能是这样的:
/****
* Represents what each transaction does. Isn't concerned about
* order/timing/'transactionInProgress' or anything like that.
*
* Here is a fake transaction that just takes 3-5 seconds to emit
* the string: `Hello ${name}`
****/
function transaction(args): Observable<string> {
const name = args?.message;
const duration = 3000 + (Math.random() * 2000);
return of("Hello").pipe(
tap(_ => console.log("starting transaction")),
switchMap(v => timer(duration).pipe(
map(_ => `${v} ${name}`)
)),
tap(_ => console.log("Ending transation"))
);
}
// Track transactions
let currentTransactionId = 0;
// Start transactions
const transactionSubj = new Subject<any>();
// Perform transaction: concatMap ensures we only start a new one if
// there isn't a current transaction underway
const transaction$ = transactionSubj.pipe(
concatMap(({id, args}) => transaction(args).pipe(
map(payload => ({id, payload}))
)),
shareReplay(1)
);
/****
* Begin a new transaction, we give it an ID since transactions are
* "hot" and we don't want to return the wrong (earlier) transactions,
* just the current one started with this call.
****/
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
transactionSubj.next({id: currentId, args});
return transaction$.pipe(
first(({id}) => id === currentId),
map(({payload}) => payload)
);
})
}
// Queue up 3 transactions, each one will wait for the previous
// one to complete before it will begin.
beginTransaction({message: "Dave"}).subscribe(console.log);
beginTransaction({message: "Tom"}).subscribe(console.log);
beginTransaction({message: "Tim"}).subscribe(console.log);
异步事务
当前设置要求交易是异步的,否则您可能会失去第一个交易。解决方法并不简单,因此我构建了一个订阅运算符,然后尽快调用一个函数。
这里是:
function initialize<T>(fn: () => void): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),
error: bindOn("error"),
complete: bindOn("complete")
});
fn();
return {
unsubscribe: () => sub.unsubscribe
};
});
}
这里正在使用中:
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId, args})),
first(({id}) => id === currentId),
map(({payload}) => payload)
);
})
}
旁白:为什么要使用 defer
?
考虑重新编写 beginTransaction:
function beginTransaction(args): Observable<any> {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId, args})),
first(({id}) => id === currentId),
map(({payload}) => payload)
);
}
在这种情况下,ID 是在您调用 beginTransaction
时设置的。
// The ID is set here, but it won't be used until subscribed
const preppedTransaction = beginTransaction({message: "Dave"});
// 10 seconds later, that ID gets used.
setTimeout(
() => preppedTransaction.subscribe(console.log),
10000
);
如果在没有初始化运算符的情况下调用 transactionSubj.next
,那么这个问题会变得更糟,因为 transactionSubj.next
也会在订阅 observable 之前 10 秒被调用(你肯定会错过输出)
问题继续:
如果你想订阅同一个 observable 两次怎么办?
const preppedTransaction = beginTransaction({message: "Dave"});
preppedTransaction.subscribe(
value => console.log("First Subscribe: ", value)
);
preppedTransaction.subscribe(
value => console.log("Second Subscribe: ", value)
);
我希望输出为:
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
相反,您得到
First Subscribe: Hello Dave
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
Second Subscribe: Hello Dave
因为您没有在订阅时获得新的ID,所以两个订阅共享一个ID。 defer
通过在订阅之前不分配 ID 来解决此问题。这在管理流中的错误时变得非常重要(让您在出错后重试 observable)。