在流中累积和重置值

Accumulating and resetting values in a stream

我正在使用 RxJS 进行响应式编程,偶然发现了一些我不确定如何解决的问题。

假设我们实现了一台自动售货机。您投入一枚硬币,select 一件物品,然后机器分发一件物品和 return 零钱。我们假设价格始终为 1 美分,因此插入四分之一(25 美分)应该 return 返回 24 美分,依此类推。

"tricky" 部分是我希望能够处理用户插入 2 个硬币然后 select 购买物品的情况。或 select 不投币就购买物品。

将插入的硬币和 selected 项目实现为流似乎很自然。然后我们可以在这两个动作之间引入某种依赖关系——合并或压缩或合并最新的。

但是,我很快 运行 遇到了一个问题,我希望在分配物品之前积累金币,但不能再多了。 AFAIU,这意味着我不能使用 sumscan,因为在某些时候无法 "reset" 以前的积累。

这是一个示例图:

coins: ---25---5-----10------------|->
acc:   ---25---30----40------------|->
items: ------------foo-----bar-----|->
combined: ---------30,foo--40,bar--|->
change:------------29------39------|->

以及相应的代码:

this.getCoinsStream()
  .scan(function(sum, current) { return sum + current })
  .combineLatest(this.getSelectedItemsStream())
  .subscribe(function(cents, item) {
    dispenseItem(item);
    dispenseChange(cents - 1);
  });

插入了 25 和 5 美分,然后 "foo" 项目被 selected。累积硬币然后合并最新的将导致 "foo" 与“30”(正确)合并,然后 "bar" 与“40”(不正确;应该是 "bar" 和“ 10").

我查看了 all of the methods 以进行分组和过滤,但没有看到任何我可以使用的东西。

我可以使用的另一种解决方案是单独积累硬币。但这在流之外引入了状态,我真的很想避免这种情况:

var centsDeposited = 0;
this.getCoinsStream().subscribe(function(cents) {
  return centsDeposited += cents;
});

this.getSelectedItemsStream().subscribe(function(item) {
  dispenseItem(item);
  dispenseChange(centsDeposited - 1);
  centsDeposited = 0;
});

此外,这不允许使流相互依赖,例如等待硬币被插入,直到 selected 操作可以 return 一个项目。

我是否缺少现有的方法?实现这样的目标的最佳方法是什么——累积值直到它们需要与另一个流合并的那一刻,但也在第一个流中等待至少 1 个值,然后再将它与第二个流中的值合并?

一般来说,您有以下一组方程式:

inserted_coins :: independent source
items :: independent source
accumulated_coins :: sum(inserted_coins)
accumulated_paid :: sum(price(items))
change :: accumulated_coins - accumulated_paid
coins_in_machine :: when items : 0, when inserted_coins : sum(inserted_coins) starting after last emission of item

最难的部分是coins_in_machine。您需要根据来自两个来源的一些发射来切换可观察到的来源。

function emits ( who ) {
  return function ( x ) { console.log([who, ": "].join(" ") + x);};
}

function sum ( a, b ) {return a + b;}

var inserted_coins = Rx.Observable.fromEvent(document.getElementById("insert"), 'click').map(function ( x ) {return 15;});
var items = Rx.Observable.fromEvent(document.getElementById("item"), 'click').map(function ( x ) {return "snickers";});

console.log("running");

var accumulated_coins = inserted_coins.scan(sum);

var coins_in_machine =
    Rx.Observable.merge(
        items.tap(emits("items")).map(function ( x ) {return {value : x, flag : 1};}),
        inserted_coins.tap(emits("coins inserted ")).map(function ( x ) {return {value : x, flag : 0};}))
        .distinctUntilChanged(function(x){return x.flag;})
        .flatMapLatest(function ( x ) {
                   switch (x.flag) {
                     case 1 :
                       return Rx.Observable.just(0);
                     case 0 :
                       return inserted_coins.scan(sum, x.value).startWith(x.value);
                   }
                 }
    ).startWith(0);

coins_in_machine.subscribe(emits("coins in machine"));

jsbin : http://jsbin.com/mejoneteyo/edit?html,js,console,output

[更新]

解释:

  • 我们将 insert_coins 流与 items 流合并,同时向它们附加一个标志,以了解当我们在合并流中接收到值时发出两者中的哪一个
  • 当是items流发出时,我们要在coins_in_machine中放入0。当它是 insert_coins 时,我们想要对传入的值求和,因为该总和将代表机器中新的硬币数量。这意味着 insert_coins 的定义根据之前定义的逻辑从一个流切换到另一个流。该逻辑是在 switchMapLatest.
  • 中实现的
  • 我使用 switchMapLatest 而不是 switchMap 否则 coins_in_machine 流将继续接收来自先前切换流的发射,即重复发射,因为最后只有我们切换到和从中切换的两个流。如果可以的话,我会说这是我们需要的关闭和切换。
  • switchMapLatest 必须 return 一个流,所以我们跳过箍来制作一个发出 0 和 never 结束的流(并且不会阻塞计算机,因为使用repeat 运营商会在那种情况下)
  • 我们跳过一些额外的环节使 inserted_coins 发出我们想要的值。我的第一个实现是 inserted_coins.scan(sum,0),但从未奏效。关键是我发现这很棘手,当我们到达流程中的那个点时,inserted_coins 已经发出了一个值,它是总和的一部分。该值是作为 flatMapLatest 的参数传递的值,但它不再存在于源代码中,因此在事后调用 scan 将无法获取它,因此有必要从flatMapLatest 并重建正确的行为。

你可以使用你的 scan/combineLatest 方法,然后用 first 结束流,然后用 repeat 这样它 "starts over" 流但是你的观察者会没看到。

var coinStream = Rx.Observable.merge(
    Rx.Observable.fromEvent($('#add5'), 'click').map(5),
    Rx.Observable.fromEvent($('#add10'), 'click').map(10),
    Rx.Observable.fromEvent($('#add25'), 'click').map(25)
    );
var selectedStream = Rx.Observable.merge(
  Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
  Rx.Observable.fromEvent($('#sprite'), 'click').map('sprite')
);

var $selection = $('#selection');
var $change = $('#change');

function dispense(selection) {
 $selection.text('Dispensed: ' + selection); 
 console.log("Dispensing Drink: " + selection);   
}

function dispenseChange(change) {
 $change.text('Dispensed change: ' + change); 
 console.log("Dispensing Change: " + change);   
}


var dispenser = coinStream.scan(function(acc, delta) { return acc + delta; }, 0)
                              .combineLatest(selectedStream, 
                                 function(coins, selection) {
                                   return {coins : coins, selection : selection};
                                 })

                              //Combine latest won't emit until both Observables have a value
                              //so you can safely get the first which will be the point that
                              //both Observables have emitted.
                              .first()

                              //First will complete the stream above so use repeat
                              //to resubscribe to the stream transparently
                              //You could also do this conditionally with while or doWhile
                              .repeat()

                              //If you only will subscribe once, then you won't need this but 
                              //here I am showing how to do it with two subscribers
                              .publish();

    
    //Dole out the change
    dispenser.pluck('coins')
             .map(function(c) { return c - 1;})
             .subscribe(dispenseChange);
    
    //Get the selection for dispensation
    dispenser.pluck('selection').subscribe(dispense);
    
    //Wire it up
    dispenser.connect();
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>

您也可以使用Window将多个硬币事件组合在一起,并以项目选择为window边界。

接下来我们可以使用zip来获取item值。

请注意,我们会立即尝试分发物品。所以用户在决定一个项目之前确实必须投入硬币。

请注意,出于安全原因,我决定同时发布 selectedStreamdispenser,我们不想在构建查询和 zip 时引发事件触发的竞争条件变得不平衡。那将是一种非常罕见的情况,但请注意,当我们的来源是冷 Observable 时,它​​们几乎会在我们订阅后立即开始生成,我们必须使用 Publish 来保护自己。

(无耻地盗取了 paulpdaniels 示例代码)。

var coinStream = Rx.Observable.merge(
    Rx.Observable.fromEvent($('#add5'), 'click').map(5),
    Rx.Observable.fromEvent($('#add10'), 'click').map(10),
    Rx.Observable.fromEvent($('#add25'), 'click').map(25)
);

var selectedStream = Rx.Observable.merge(
    Rx.Observable.fromEvent($('#coke'), 'click').map('Coke'),
    Rx.Observable.fromEvent($('#sprite'), 'click').map('Sprite')
).publish();

var $selection = $('#selection');
var $change = $('#change');

function dispense(selection) {
    $selection.text('Dispensed: ' + selection); 
    console.log("Dispensing Drink: " + selection);   
}

function dispenseChange(change) {
    $change.text('Dispensed change: ' + change); 
    console.log("Dispensing Change: " + change);   
}

// Build the query.
var dispenser = Rx.Observable.zip(
    coinStream
        .window(selectedStream)
        .flatMap(ob => ob.reduce((acc, cur) => acc + cur, 0)),
    selectedStream,
    (coins, selection) => ({coins : coins, selection: selection})
).filter(pay => pay.coins != 0) // Do not give out items if there are no coins.
.publish();

var dispose = new Rx.CompositeDisposable(
    //Dole out the change
    dispenser
        .pluck('coins')
        .map(function(c) { return c - 1;})
        .subscribe(dispenseChange),

    //Get the selection for dispensation
    dispenser
        .pluck('selection')
        .subscribe(dispense),

    //Wire it up
    dispenser.connect(),
    selectedStream.connect()
);
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<button id="coke">Coke</button>
<button id="sprite">Sprite</button>
<button id="add5">5</button>
<button id="add10">10</button>
<button id="add25">25</button>
<div id="change"></div>
<div id="selection"></div>