在流中累积和重置值
Accumulating and resetting values in a stream
我正在使用 RxJS 进行响应式编程,偶然发现了一些我不确定如何解决的问题。
假设我们实现了一台自动售货机。您投入一枚硬币,select 一件物品,然后机器分发一件物品和 return 零钱。我们假设价格始终为 1 美分,因此插入四分之一(25 美分)应该 return 返回 24 美分,依此类推。
"tricky" 部分是我希望能够处理用户插入 2 个硬币然后 select 购买物品的情况。或 select 不投币就购买物品。
将插入的硬币和 selected 项目实现为流似乎很自然。然后我们可以在这两个动作之间引入某种依赖关系——合并或压缩或合并最新的。
但是,我很快 运行 遇到了一个问题,我希望在分配物品之前积累金币,但不能再多了。 AFAIU,这意味着我不能使用 sum
或 scan
,因为在某些时候无法 "reset" 以前的积累。
这是一个示例图:
coins: ---25---5-----10------------|->
acc: ---25---30----40------------|->
items: ------------foo-----bar-----|->
combined: ---------30,foo--40,bar--|->
change:------------29------39------|->
以及相应的代码:
this.getCoinsStream()
.scan(function(sum, current) { return sum + current })
.combineLatest(this.getSelectedItemsStream())
.subscribe(function(cents, item) {
dispenseItem(item);
dispenseChange(cents - 1);
});
插入了 25 和 5 美分,然后 "foo" 项目被 selected。累积硬币然后合并最新的将导致 "foo" 与“30”(正确)合并,然后 "bar" 与“40”(不正确;应该是 "bar" 和“ 10").
我查看了 all of the methods 以进行分组和过滤,但没有看到任何我可以使用的东西。
我可以使用的另一种解决方案是单独积累硬币。但这在流之外引入了状态,我真的很想避免这种情况:
var centsDeposited = 0;
this.getCoinsStream().subscribe(function(cents) {
return centsDeposited += cents;
});
this.getSelectedItemsStream().subscribe(function(item) {
dispenseItem(item);
dispenseChange(centsDeposited - 1);
centsDeposited = 0;
});
此外,这不允许使流相互依赖,例如等待硬币被插入,直到 selected 操作可以 return 一个项目。
我是否缺少现有的方法?实现这样的目标的最佳方法是什么——累积值直到它们需要与另一个流合并的那一刻,但也在第一个流中等待至少 1 个值,然后再将它与第二个流中的值合并?
一般来说,您有以下一组方程式:
inserted_coins :: independent source
items :: independent source
accumulated_coins :: sum(inserted_coins)
accumulated_paid :: sum(price(items))
change :: accumulated_coins - accumulated_paid
coins_in_machine :: when items : 0, when inserted_coins : sum(inserted_coins) starting after last emission of item
最难的部分是coins_in_machine
。您需要根据来自两个来源的一些发射来切换可观察到的来源。
function emits ( who ) {
return function ( x ) { console.log([who, ": "].join(" ") + x);};
}
function sum ( a, b ) {return a + b;}
var inserted_coins = Rx.Observable.fromEvent(document.getElementById("insert"), 'click').map(function ( x ) {return 15;});
var items = Rx.Observable.fromEvent(document.getElementById("item"), 'click').map(function ( x ) {return "snickers";});
console.log("running");
var accumulated_coins = inserted_coins.scan(sum);
var coins_in_machine =
Rx.Observable.merge(
items.tap(emits("items")).map(function ( x ) {return {value : x, flag : 1};}),
inserted_coins.tap(emits("coins inserted ")).map(function ( x ) {return {value : x, flag : 0};}))
.distinctUntilChanged(function(x){return x.flag;})
.flatMapLatest(function ( x ) {
switch (x.flag) {
case 1 :
return Rx.Observable.just(0);
case 0 :
return inserted_coins.scan(sum, x.value).startWith(x.value);
}
}
).startWith(0);
coins_in_machine.subscribe(emits("coins in machine"));
jsbin : http://jsbin.com/mejoneteyo/edit?html,js,console,output
[更新]
解释:
- 我们将 insert_coins 流与 items 流合并,同时向它们附加一个标志,以了解当我们在合并流中接收到值时发出两者中的哪一个
- 当是items流发出时,我们要在
coins_in_machine
中放入0。当它是 insert_coins
时,我们想要对传入的值求和,因为该总和将代表机器中新的硬币数量。这意味着 insert_coins
的定义根据之前定义的逻辑从一个流切换到另一个流。该逻辑是在 switchMapLatest
. 中实现的
- 我使用
switchMapLatest
而不是 switchMap
否则 coins_in_machine
流将继续接收来自先前切换流的发射,即重复发射,因为最后只有我们切换到和从中切换的两个流。如果可以的话,我会说这是我们需要的关闭和切换。
switchMapLatest
必须 return 一个流,所以我们跳过箍来制作一个发出 0 和 never
结束的流(并且不会阻塞计算机,因为使用repeat
运营商会在那种情况下)
- 我们跳过一些额外的环节使
inserted_coins
发出我们想要的值。我的第一个实现是 inserted_coins.scan(sum,0)
,但从未奏效。关键是我发现这很棘手,当我们到达流程中的那个点时,inserted_coins
已经发出了一个值,它是总和的一部分。该值是作为 flatMapLatest
的参数传递的值,但它不再存在于源代码中,因此在事后调用 scan
将无法获取它,因此有必要从flatMapLatest
并重建正确的行为。
你可以使用你的 scan/combineLatest 方法,然后用 first
结束流,然后用 repeat
这样它 "starts over" 流但是你的观察者会没看到。
var coinStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#add5'), 'click').map(5),
Rx.Observable.fromEvent($('#add10'), 'click').map(10),
Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);
var selectedStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
Rx.Observable.fromEvent($('#sprite'), 'click').map('sprite')
);
var $selection = $('#selection');
var $change = $('#change');
function dispense(selection) {
$selection.text('Dispensed: ' + selection);
console.log("Dispensing Drink: " + selection);
}
function dispenseChange(change) {
$change.text('Dispensed change: ' + change);
console.log("Dispensing Change: " + change);
}
var dispenser = coinStream.scan(function(acc, delta) { return acc + delta; }, 0)
.combineLatest(selectedStream,
function(coins, selection) {
return {coins : coins, selection : selection};
})
//Combine latest won't emit until both Observables have a value
//so you can safely get the first which will be the point that
//both Observables have emitted.
.first()
//First will complete the stream above so use repeat
//to resubscribe to the stream transparently
//You could also do this conditionally with while or doWhile
.repeat()
//If you only will subscribe once, then you won't need this but
//here I am showing how to do it with two subscribers
.publish();
//Dole out the change
dispenser.pluck('coins')
.map(function(c) { return c - 1;})
.subscribe(dispenseChange);
//Get the selection for dispensation
dispenser.pluck('selection').subscribe(dispense);
//Wire it up
dispenser.connect();
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>
您也可以使用Window将多个硬币事件组合在一起,并以项目选择为window边界。
接下来我们可以使用zip来获取item值。
请注意,我们会立即尝试分发物品。所以用户在决定一个项目之前确实必须投入硬币。
请注意,出于安全原因,我决定同时发布 selectedStream
和 dispenser
,我们不想在构建查询和 zip 时引发事件触发的竞争条件变得不平衡。那将是一种非常罕见的情况,但请注意,当我们的来源是冷 Observable 时,它们几乎会在我们订阅后立即开始生成,我们必须使用 Publish
来保护自己。
(无耻地盗取了 paulpdaniels 示例代码)。
var coinStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#add5'), 'click').map(5),
Rx.Observable.fromEvent($('#add10'), 'click').map(10),
Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);
var selectedStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
Rx.Observable.fromEvent($('#sprite'), 'click').map('Sprite')
).publish();
var $selection = $('#selection');
var $change = $('#change');
function dispense(selection) {
$selection.text('Dispensed: ' + selection);
console.log("Dispensing Drink: " + selection);
}
function dispenseChange(change) {
$change.text('Dispensed change: ' + change);
console.log("Dispensing Change: " + change);
}
// Build the query.
var dispenser = Rx.Observable.zip(
coinStream
.window(selectedStream)
.flatMap(ob => ob.reduce((acc, cur) => acc + cur, 0)),
selectedStream,
(coins, selection) => ({coins : coins, selection: selection})
).filter(pay => pay.coins != 0) // Do not give out items if there are no coins.
.publish();
var dispose = new Rx.CompositeDisposable(
//Dole out the change
dispenser
.pluck('coins')
.map(function(c) { return c - 1;})
.subscribe(dispenseChange),
//Get the selection for dispensation
dispenser
.pluck('selection')
.subscribe(dispense),
//Wire it up
dispenser.connect(),
selectedStream.connect()
);
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>
我正在使用 RxJS 进行响应式编程,偶然发现了一些我不确定如何解决的问题。
假设我们实现了一台自动售货机。您投入一枚硬币,select 一件物品,然后机器分发一件物品和 return 零钱。我们假设价格始终为 1 美分,因此插入四分之一(25 美分)应该 return 返回 24 美分,依此类推。
"tricky" 部分是我希望能够处理用户插入 2 个硬币然后 select 购买物品的情况。或 select 不投币就购买物品。
将插入的硬币和 selected 项目实现为流似乎很自然。然后我们可以在这两个动作之间引入某种依赖关系——合并或压缩或合并最新的。
但是,我很快 运行 遇到了一个问题,我希望在分配物品之前积累金币,但不能再多了。 AFAIU,这意味着我不能使用 sum
或 scan
,因为在某些时候无法 "reset" 以前的积累。
这是一个示例图:
coins: ---25---5-----10------------|->
acc: ---25---30----40------------|->
items: ------------foo-----bar-----|->
combined: ---------30,foo--40,bar--|->
change:------------29------39------|->
以及相应的代码:
this.getCoinsStream()
.scan(function(sum, current) { return sum + current })
.combineLatest(this.getSelectedItemsStream())
.subscribe(function(cents, item) {
dispenseItem(item);
dispenseChange(cents - 1);
});
插入了 25 和 5 美分,然后 "foo" 项目被 selected。累积硬币然后合并最新的将导致 "foo" 与“30”(正确)合并,然后 "bar" 与“40”(不正确;应该是 "bar" 和“ 10").
我查看了 all of the methods 以进行分组和过滤,但没有看到任何我可以使用的东西。
我可以使用的另一种解决方案是单独积累硬币。但这在流之外引入了状态,我真的很想避免这种情况:
var centsDeposited = 0;
this.getCoinsStream().subscribe(function(cents) {
return centsDeposited += cents;
});
this.getSelectedItemsStream().subscribe(function(item) {
dispenseItem(item);
dispenseChange(centsDeposited - 1);
centsDeposited = 0;
});
此外,这不允许使流相互依赖,例如等待硬币被插入,直到 selected 操作可以 return 一个项目。
我是否缺少现有的方法?实现这样的目标的最佳方法是什么——累积值直到它们需要与另一个流合并的那一刻,但也在第一个流中等待至少 1 个值,然后再将它与第二个流中的值合并?
一般来说,您有以下一组方程式:
inserted_coins :: independent source
items :: independent source
accumulated_coins :: sum(inserted_coins)
accumulated_paid :: sum(price(items))
change :: accumulated_coins - accumulated_paid
coins_in_machine :: when items : 0, when inserted_coins : sum(inserted_coins) starting after last emission of item
最难的部分是coins_in_machine
。您需要根据来自两个来源的一些发射来切换可观察到的来源。
function emits ( who ) {
return function ( x ) { console.log([who, ": "].join(" ") + x);};
}
function sum ( a, b ) {return a + b;}
var inserted_coins = Rx.Observable.fromEvent(document.getElementById("insert"), 'click').map(function ( x ) {return 15;});
var items = Rx.Observable.fromEvent(document.getElementById("item"), 'click').map(function ( x ) {return "snickers";});
console.log("running");
var accumulated_coins = inserted_coins.scan(sum);
var coins_in_machine =
Rx.Observable.merge(
items.tap(emits("items")).map(function ( x ) {return {value : x, flag : 1};}),
inserted_coins.tap(emits("coins inserted ")).map(function ( x ) {return {value : x, flag : 0};}))
.distinctUntilChanged(function(x){return x.flag;})
.flatMapLatest(function ( x ) {
switch (x.flag) {
case 1 :
return Rx.Observable.just(0);
case 0 :
return inserted_coins.scan(sum, x.value).startWith(x.value);
}
}
).startWith(0);
coins_in_machine.subscribe(emits("coins in machine"));
jsbin : http://jsbin.com/mejoneteyo/edit?html,js,console,output
[更新]
解释:
- 我们将 insert_coins 流与 items 流合并,同时向它们附加一个标志,以了解当我们在合并流中接收到值时发出两者中的哪一个
- 当是items流发出时,我们要在
coins_in_machine
中放入0。当它是insert_coins
时,我们想要对传入的值求和,因为该总和将代表机器中新的硬币数量。这意味着insert_coins
的定义根据之前定义的逻辑从一个流切换到另一个流。该逻辑是在switchMapLatest
. 中实现的
- 我使用
switchMapLatest
而不是switchMap
否则coins_in_machine
流将继续接收来自先前切换流的发射,即重复发射,因为最后只有我们切换到和从中切换的两个流。如果可以的话,我会说这是我们需要的关闭和切换。 switchMapLatest
必须 return 一个流,所以我们跳过箍来制作一个发出 0 和never
结束的流(并且不会阻塞计算机,因为使用repeat
运营商会在那种情况下)- 我们跳过一些额外的环节使
inserted_coins
发出我们想要的值。我的第一个实现是inserted_coins.scan(sum,0)
,但从未奏效。关键是我发现这很棘手,当我们到达流程中的那个点时,inserted_coins
已经发出了一个值,它是总和的一部分。该值是作为flatMapLatest
的参数传递的值,但它不再存在于源代码中,因此在事后调用scan
将无法获取它,因此有必要从flatMapLatest
并重建正确的行为。
你可以使用你的 scan/combineLatest 方法,然后用 first
结束流,然后用 repeat
这样它 "starts over" 流但是你的观察者会没看到。
var coinStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#add5'), 'click').map(5),
Rx.Observable.fromEvent($('#add10'), 'click').map(10),
Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);
var selectedStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
Rx.Observable.fromEvent($('#sprite'), 'click').map('sprite')
);
var $selection = $('#selection');
var $change = $('#change');
function dispense(selection) {
$selection.text('Dispensed: ' + selection);
console.log("Dispensing Drink: " + selection);
}
function dispenseChange(change) {
$change.text('Dispensed change: ' + change);
console.log("Dispensing Change: " + change);
}
var dispenser = coinStream.scan(function(acc, delta) { return acc + delta; }, 0)
.combineLatest(selectedStream,
function(coins, selection) {
return {coins : coins, selection : selection};
})
//Combine latest won't emit until both Observables have a value
//so you can safely get the first which will be the point that
//both Observables have emitted.
.first()
//First will complete the stream above so use repeat
//to resubscribe to the stream transparently
//You could also do this conditionally with while or doWhile
.repeat()
//If you only will subscribe once, then you won't need this but
//here I am showing how to do it with two subscribers
.publish();
//Dole out the change
dispenser.pluck('coins')
.map(function(c) { return c - 1;})
.subscribe(dispenseChange);
//Get the selection for dispensation
dispenser.pluck('selection').subscribe(dispense);
//Wire it up
dispenser.connect();
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>
您也可以使用Window将多个硬币事件组合在一起,并以项目选择为window边界。
接下来我们可以使用zip来获取item值。
请注意,我们会立即尝试分发物品。所以用户在决定一个项目之前确实必须投入硬币。
请注意,出于安全原因,我决定同时发布 selectedStream
和 dispenser
,我们不想在构建查询和 zip 时引发事件触发的竞争条件变得不平衡。那将是一种非常罕见的情况,但请注意,当我们的来源是冷 Observable 时,它们几乎会在我们订阅后立即开始生成,我们必须使用 Publish
来保护自己。
(无耻地盗取了 paulpdaniels 示例代码)。
var coinStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#add5'), 'click').map(5),
Rx.Observable.fromEvent($('#add10'), 'click').map(10),
Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);
var selectedStream = Rx.Observable.merge(
Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
Rx.Observable.fromEvent($('#sprite'), 'click').map('Sprite')
).publish();
var $selection = $('#selection');
var $change = $('#change');
function dispense(selection) {
$selection.text('Dispensed: ' + selection);
console.log("Dispensing Drink: " + selection);
}
function dispenseChange(change) {
$change.text('Dispensed change: ' + change);
console.log("Dispensing Change: " + change);
}
// Build the query.
var dispenser = Rx.Observable.zip(
coinStream
.window(selectedStream)
.flatMap(ob => ob.reduce((acc, cur) => acc + cur, 0)),
selectedStream,
(coins, selection) => ({coins : coins, selection: selection})
).filter(pay => pay.coins != 0) // Do not give out items if there are no coins.
.publish();
var dispose = new Rx.CompositeDisposable(
//Dole out the change
dispenser
.pluck('coins')
.map(function(c) { return c - 1;})
.subscribe(dispenseChange),
//Get the selection for dispensation
dispenser
.pluck('selection')
.subscribe(dispense),
//Wire it up
dispenser.connect(),
selectedStream.connect()
);
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>