RXJS:根据传入的可观察值发出额外值

RXJS: Emit extra value based upon observable value coming in

我是 RxJs 6.0(或与此相关的任何 RxJs 版本)的新手,虽然我知道它有多强大,但一些简单的概念让我难以理解。

我有一种情况,我想根据源流向输出流发出一个额外的值,但我一直不知道该怎么做。我真的需要一个可以采用方法而不是静态值的 startsWith 运算符,然后我就可以实现这一点。这是一些设置场景的愚蠢代码。

import { startWith, scan, tap, mergeMap, map, concat } from 'rxjs/operators';

interface IData {
  data: number;
  emitExtraVal: boolean;
}

class obsData implements IData {
  constructor(data: number) {
    this.data = data;
    this.emitExtraVal = false;
  }
  public data: number;
  public emitExtraVal: boolean;

}

class extraData implements IData {
  constructor(data: number) {
    this.data = data;
    this.emitExtraVal = true;
  }

  public data: number;
  public emitExtraVal: boolean;
}
const sourceOne = of(new obsData(1),new obsData(2),new obsData(3));
/*const finalSource = sourceOne.pipe(
  map((sData) => <IData>new extraData(sData.data)),
  map((sData) => sData)
);*/
const finalSource = sourceOne.pipe(
  mergeMap((sData) => concat(of(<IData>new extraData(sData.data), of(sData))))
);
const subscribe = finalSource.subscribe(val => console.log('Data:' + val.emitExtraVal));

我想要做的是输出一个extraData实例,其中包含obsData中的数字,后跟我刚刚从源中获取的obsData。这不是我正在尝试的确切场景,但它展示了我正在尝试做的事情的核心,即创建一个额外的输出,然后是另一个输出,这两个输出都依赖于单一源输入。

这个更新的问题示例是基于评论,但是这个示例不会运行因为语法不正确

这会产生以下错误:

您提供了 'function (source) { return source.lift.call(concat_1.concat.apply(void 0, [source].concat(observables))); }' 预期流的位置。您可以提供 Observable、Promise、Array 或 Iterable。

---更新--- 这是由于响应而有效的最终答案。我遇到的主要问题是您可以从 rxjs/operators 或 rxjs 导入 concat。如果你在管道命令中使用它,你必须从 rxjs 导入它。

// RxJS v6+
import { of, fromEvent, combineLatest, concat } from 'rxjs';
import { startWith, scan, tap, mergeMap, map } from 'rxjs/operators';

interface IData {
  data: number;
  emitExtraVal: boolean;
}

class obsData implements IData {
  constructor(data: number) {
    this.data = data;
    this.emitExtraVal = false;
  }
  public data: number;
  public emitExtraVal: boolean;

}

class extraData implements IData {
  constructor(data: number) {
    this.data = data;
    this.emitExtraVal = true;
  }

  public data: number;
  public emitExtraVal: boolean;
}
const sourceOne = of(new obsData(1),new obsData(2),new obsData(3));

const finalSource = sourceOne.pipe(
  mergeMap((sData) => concat(of(<IData>new extraData(sData.data), <IData>sData)))
);
const subscribe = finalSource.subscribe(val => console.log('Data:' + val.emitExtraVal));

create an extra output followed by another output, both of which rely on the a single source input.

在我的示例中,dataItem 是 "single source input",它使用 from 运算符转换为 Observable of it 元素。稍后你可以根据需要使用 flattening operators (like mergeMap or concatMap)将所有这些 Observables "flatten" 合并为一个。看例子:

const { Observable, of, from, concat } = rxjs; // = require("rxjs")
const { mergeMap } = rxjs.operators; // = require("rxjs/operators")

const complexAjaxCall = id => of(`${id}-from-ajax`);

const ids = [1, 2, 3];

from(ids).pipe(
  mergeMap(id => concat(
    complexAjaxCall(id),
    of(id)
  )),
).subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>