我可以使用来自另一个可观察对象的多个事件来操作一个可观察对象吗?

Can I manipulate an observable with multiple events from another observable?

假设我有一组映射到输入字段的验证消息。假设我们从服务器获取这些验证消息。

我还有一个映射到输入字段的更改事件流。

每当我收到给定字段的更改事件时,我都想

  1. 查看该字段是否有相应的验证消息
  2. 删除验证消息

这里有一个例子,它有望说明为什么通常的嫌疑人(combineLatest,withLatestFrom)没有让我越过线。

combineLatest 的问题是:第 4 个更改事件将过滤第 2 个验证数组。

withLatestFrom 的问题是:第三个更改事件将撤消第二个更改事件的工作。

有人有什么建议吗?我是不是找错树了?我是否过度简化了问题?有没有完美的 RxJS 方法?

每次收到验证消息时,您似乎都在重新开始处理更改事件。在这种情况下,您可以尝试类似的方法:

filteredValidation$ = validation$.flatMapLatest(function ( aObjs ) {
  return changeEvent$.scan(function ( acc, changeEvent ) {
    return isIn(changeEvent, acc) ? removeChangeEvent(changeEvent, acc) : acc;
  }, aObjs);
});

isInremoveChangeEvent要根据change_event的具体形状来写,其实很简单

  • 每条验证消息都会启动一个新流
  • 该流将具有由 scan 函数中的累加器 acc 表示的状态,并且该状态本身是最新过滤的验证对象。

我还没有测试过,但希望它有用。

顺便说一句:我喜欢你的图表,它是你想要实现的处理的完美例证。

这是另一个使用不同技术的答案。这个想法是相似的(使用 scan 来记住以前的值,但是我们从一个依赖于源发出值的约简函数中得到当前值)。

jsfiddle : http://jsfiddle.net/guc1tm4m/1

var ta_validation = document.getElementById('ta_validation');
var ta_change = document.getElementById('ta_change');
var ta_result = document.getElementById('ta_result');

function emits ( who, who_ ) {return function ( x ) {
 who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

function isIn (changeEvent, prev){
  return !!prev[0][changeEvent];
}

function removeChangeEvent(changeEvent, prev) {
delete prev[0][changeEvent];
  return prev;
}

 var validation$ = Rx.Observable
 .fromEvent(document.getElementById('validation'), 'click')
 .map(function(_, index){
   var values=[
     [{a:'req', b:'req', c:'req'}],
           [{d:'req', e:'req'}],
           [{x:'req', y:'req', z:'req'}]
   ];
   return values[index % values.length];
   })
 .do(emits(ta_validation, 'validation'))
 .share();

var changeEvent$ = Rx.Observable
 .fromEvent(document.getElementById('change'), 'click')
 .map(function(_, index){
   console.log('ssds', index)
   var values=['x', 'a', 'b', 'z'];
   return values[index % values.length];
   })
 .do(emits(ta_change, 'change'))
 .share();

var filteredValidation$ = Rx.Observable
  .merge(validation$.map(function (x) {return function (  ) {return x;}}),
         changeEvent$.map(function ( changeEvent ) {
           return function ( prev ) {
             return isIn(changeEvent, prev) ? removeChangeEvent(changeEvent, prev) : prev;
           };
         }))
  .scan(function ( prev, f ) {
          return f(prev);
        }, {})
  .do(emits(ta_result, 'result'));

validation$.subscribe(function(){    });
changeEvent$.subscribe(function(){    });
filteredValidation$.subscribe(function(){    });

我改编了上面的 jsfiddle 来展示我想出的解决方案。

jsfiddle: http://jsfiddle.net/hellosmithy/7kcoud2c/2/

基本上这个版本使用 Rx.Observable.window 根据来自 validation$ 流的新事件将 changeEvents$ 流分块。

var filteredValidation$ = validation$
    .merge(
        changeEvent$
        .window(validation$)
        .flatMapLatest(changeWindow => {
            return changeWindow.scan((acc, next) => acc.add(next), new Set());
        )
        .withLatestFrom(validation$, (filterSet, errs) => {
            return errs.filter(err => !filterSet.has(err.key));
        })
    )
    .distinctUntilChanged()