如何在链接的可观察对象之间传递结果

How to pass results between chained observables

抽象问题: 每次源 Observable 发出事件时,需要触发一系列 API 调用和 Angular 服务。其中一些调用取决于以前的结果。

在我的示例中,源 Observable startUpload$ 触发了一系列依赖调用。

使用解构可以这样写:

this.startUploadEvent$.pipe(
      concatMap(event => this.getAuthenticationHeaders(event)),
      map(({ event, headers }) => this.generateUploadId(event, headers)),
      tap(({ event, headers, id }) => this.emitUploadStartEvent(id, event)),
      concatMap(({ event, headers, id }) => this.createPdfDocument(event, headers, id)),
      concatMap(({ event, headers, id, pdfId }) => this.uploadBilderForPdf(event, pdfId, headers, id)),
      mergeMap(({ event, headers, id, pdfId, cloudId }) => this.closePdf(cloudId, event, headers, id, pdfId)),
      tap(({ event, headers, id, pdfId, cloudId }) => this.emitUploadDoneEvent(id, event, cloudId)),
).subscribe()

它几乎读起来像是命令式方法。但是它有一些问题:

_

private closePdf(cloudId, event, headers, id, pdfId) {
    return this.httpClient.post(..., { headers } )
        .pipe(
             //...,
             map(() => ({ event, headers, id, pdfId, cloudId }))
        )
}

如果编译器能够处理样板文件(比如 async await)来编写这样的代码(带有上述 none 的问题),那就太好了:

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
  }

如何在链接的可观察对象之间传递结果而不会出现我提到的问题?有没有我错过的 rxjs 概念?

你能为数据集使用一个对象吗?像这样:

接口:

export interface Packet {
  event: string;
  headers?: string;
  id?: number;
  pdfId?: number;
  cloudId?: number;
}

然后在代码中,像这样:

服务:

  this.startUploadEvent$.pipe(
    concatMap(packet => this.doThingOne(packet)),
    map(packet => this.doThingTwo(packet)),
    tap(packet => this.doThingThree(packet)),
    // ...
  );

这样每个方法都可以使用它需要的对象的位并传递其余部分。虽然这确实需要更改每个方法以接收和使用对象。

关于此类代码产生的问题,您是正确的,抽象的解决方案是将组合结果并将正确参数传递给每个调用的责任从方法转移到管道。

可以很容易地完成一些改进。 tap 运算符不会修改值,因此您可以从解构中删除不需要的属性。 map 只是转换结果,所以

map(({ event, headers }) => this.generateUploadId(event, headers)),

我们可以写

map(({ event, headers }) => ({
  event,
  headers,
  id: this.generateUploadId(event, headers)
}))

this.generateUploadId不再需要return一个对象。

至于 high-order 映射运算符,我想到了几个选项。 首先,大多数 'xMap' 运算符都支持结果选择器作为最后一个参数,其目的正是我们所需要的——将源值与结果结合起来。结果选择器是 depricated,因此嵌套管道是当前的方式,但让我们看一下使用结果选择器

会是什么样子

选项 0. 结果选择器(已弃用)

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event),
      (event, headers) => ({ event, headers }) // <-- Result Selector
    )
  );

选项 1. 嵌套管道(又名“使用闭包”)

它看起来与选项 0 非常相似,但 event 保留在闭包中而不是内部可观察的。

this.startUploadEvent$
  .pipe(
    concatMap(
      event => this.getAuthenticationHeaders(event)
        .pipe(map(headers => ({ event, headers })))
    )
  );

选项 2. 自定义运算符(此处也包含闭包)

可以制作自定义运算符并获得与结果选择器非常相似的语法

function withResultSelector(operator, transformer) {
  let sourceValue;
  return pipe(
    tap(value => (sourceValue = value)),
    operator,
    map(value => transformer(sourceValue, value))
  );
}

用法:

this.startUploadEvent$
  .pipe(
    withResultSelector(
      concatMap(event => this.getAuthenticationHeaders(event)),
      (event, headers) => ({ event, headers })
    )
  );

更进一步,可以提取重复的内容并使一切更实用:

const mergeAs = propName => (a, b) => ({ ...a, [propName]: b });
const opAndMergeAs = (operator, propName) => withResultSelector(operator, mergeAs(propName));

this.startUploadEvent$
  .pipe(
    opAndMergeAs(concatMap(event => this.getAuthenticationHeaders(event)), "headers")
  );

为此编写适当的类型可能有点麻烦,但这是一个不同的问题

Playground我以前写过答案

你的方法绝对不应该与上下文耦合,也不要考虑将结果映射到特定形状。

RxJS 完全是关于函数式编程的。在函数式编程中有一种模式,例如 Adapting Arguments to Parametersref

它允许我们将方法签名与上下文分离。

为了实现这一点,您可以编写 mapcontentMapmergMap 运算符的上下文相关版本,以便最终解决方案如下所示:

this.startUploadEvent$.pipe(
      map(withKey('event')),
      concatMap_(({event}) => this.getAuthenticationHeaders(event), 'headers'),
      map_(({ headers }) => this.generateUploadId(headers), 'id'),
      tap(({ event, id }) => this.emitUploadStartEvent(id, event)),
      concatMap_(({ id }) => this.createPdfDocument(id), 'pdfId'),
      concatMap_(({ pdfId }) => this.uploadBuilderForPdf(pdfId), 'cloudId'),
      mergeMap_(({ cloudId }) => this.closePdf(cloudId)),
      tap(({id, event, cloudId}) => this.emitUploadDoneEvent(id, event, cloudId)),
    ).subscribe(console.log);

注意这些运算符后的 _

Stackblitz Example

那些自定义运算符的目标是通过投影函数获取参数对象并将投影结果添加到原始参数对象。

function map_<K extends string, P, V>(project: (params: P) => V): OperatorFunction<P, P>;
function map_<K extends string, P, V>(project: (params: P) => V, key: K): OperatorFunction<P, P & Record<K, V>>;
function map_<K extends string, P, V>(project: (params: P) => V, key?: K): OperatorFunction<P, P> {
  return map(gatherParams(project, key));
}

function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function concatMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return concatMap(gatherParamsOperator(projection, key));
}

function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>): OperatorFunction<P, P>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key: K): OperatorFunction<P, P & Record<K, V>>;
function mergeMap_<K extends string, P, V>(projection: (params: P) => Observable<V>, key?: K): OperatorFunction<P, P> {
  return mergeMap(gatherParamsOperator(projection, key));
}

// https://github.com/Microsoft/TypeScript/wiki/FAQ#why-am-i-getting-supplied-parameters-do-not-match-any-signature-error
function gatherParams<K extends string, P, V>(fn: (params: P) => V): (params: P) => P;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key: K): (params: P) => P & Record<K, V>;
function gatherParams<K extends string, P, V>(fn: (params: P) => V, key?: K): (params: P) => P {
  return (params: P) => {
    if (typeof key === 'string') {
      return Object.assign({}, params, { [key]: fn(params) } as Record<K, V>);
    }

    return params;
  };
}

function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>): (params: P) => Observable<P>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key: K): (params: P) => Observable<P & Record<K, V>>;
function gatherParamsOperator<K extends string, P, V>(fn: (params: P) => Observable<V>, key?: K): (params: P) => Observable<P> {
  return (params: P) => {
    return fn(params).pipe(map(value => gatherParams((_: P) => value, key)(params)));
  };
}

function withKey<K extends string, V>(key: K): (value: V) => Record<K, V> {
  return (value: V) => ({ [key]: value } as Record<K, V>);
}

我在这里使用了函数重载,因为有时我们不需要为参数添加额外的键。参数应该只在 this.closePdf(...) 方法的情况下通过它。

因此,您将获得与之前具有类型安全性的版本相同的解耦版本:

是不是很像over-engineering?

在大多数情况下,您应该遵循 YAGNI(您不会需要它)原则。最好不要给现有代码增加更多的复杂性。对于这种情况,您应该坚持在运算符之间共享参数的一些简单实现,如下所示:

ngOnInit() {
  const params: Partial<Params> = {};
  this.startUploadEvent$.pipe(
    concatMap(event => (params.event = event) && this.getAuthenticationHeaders(event)),
    map(headers => (params.headers = headers) && this.generateUploadId(headers)),
    tap(id => (params.uploadId = id) && this.emitUploadStartEvent(id, event)),
    concatMap(id => this.createPdfDocument(id)),
    concatMap(pdfId => (params.pdfId = pdfId) && this.uploadBuilderForPdf(pdfId)),
    mergeMap(cloudId => (params.cloudId = cloudId) && this.closePdf(cloudId)),
    tap(() => this.emitUploadDoneEvent(params.pdfId, params.cloudId, params.event)),
  ).subscribe(() => {
    console.log(params)
  });

其中 Params 类型是:

interface Params {
  event: any;
  headers: any;
  uploadId: any;
  pdfId: any;
  cloudId: any;
}

请注意我在作业中使用的括号 (params.cloudId = cloudId)

Stackblitz Example


还有很多其他方法,但它们需要更改您使用 rxjs 运算符的流程:

据我了解,您关心的是可读性以及不必在方法之间传递负载。

你有没有想过将 Observable 转换为 Promise?这里重要的是,observables 必须完成,这样 promise 才能实现并可以被解决(与 complete 相同,但仅用于 promise)。

由于你的建议,见上文(与异步等待一样)我得出了这个建议。

private async startUpload(event: StartUploadEvent) {
    const headers = await this.getAuthenticationHeaders(event).toPromise();
    const id = await this.generateUploadId().toPromise();
    
    this.emitUploadStartEvent(id, event);
    
    const pdfId = await this.createPdfDocument(event, headers, id).toPromise();
    await this.uploadBilderForPdf(event, pdfId, headers, id).toPromise();
    
    const cloudId = await this.closePdf(headers, pdfId).toPromise();
    this.emitUploadDoneEvent(id, event, cloudId)
    
    return cloudId
}

信息: 在这里您可以阅读如果您在未完成可观察对象的情况下将可观察对象转换为承诺会发生什么:

注意:我正在满足您的期望

And maybe there are other ways to solve the problem which are not violating common best practices

您可以:

  • 将每个动作的结果分配给一个 observable

  • 根据早期结果链接后续函数调用

  • 这些结果可以通过 withLatestFrom

    在以后的操作调用中重复使用
  • shareReplay 用于防止后来的 withLatestFrom 订阅导致较早的功能 re-execute

    function startUpload(event$: Observable<string>) {
      const headers$ = event$.pipe(
        concatMap(event => getAuthenticationHeaders(event)),
        shareReplay()
        );
    
      const id$ = headers$.pipe(
        map(() => generateUploadId()),
        shareReplay()
        );
    
      const emitUploadEvent$ = id$.pipe(
        withLatestFrom(event$),   // use earlier result
        map(([id, event]) => emitUploadStartEvent(id, event)),
        shareReplay()
        );
    
       // etc
    }
    

如上,函数只接受他们需要的参数,没有pass-through。

演示:https://stackblitz.com/edit/so-rxjs-chaining-1?file=index.ts

可以使用 rxjs 自定义运算符简化此模式(请注意,这可以进一步完善,包括键入):

function call<T, R, TArgs extends any[], OArgs extends Observable<any>[]>(
  operator: (func: ((a: TArgs) => R)) => OperatorFunction<TArgs,R>,
  action: (...args: any[]) => R,
  ignoreInput: boolean,
  ...observableArgs: OArgs
): (args: Observable<T>) => Observable<R> {
  return (input: Observable<T>) => input.pipe(
    withLatestFrom(...observableArgs),
    operator((args: any[]) => action(...args.slice(ignoreInput ? 1: 0))),
    shareReplay(1)
  );
}

可以这样使用:

function startUpload(event$: Observable<string>) {
  const headers$ = event$.pipe(
    call(concatMap, getAuthenticationHeaders, true)
  );

  const id$ = headers$.pipe(
    call(map, generateUploadId, false)
  );

  const startEmitted$ = id$.pipe(
    call(map, emitUploadStartEvent, true, event$)
  );

  const pdfId$ = startEmitted$.pipe(
    call(map, createPdfDocument, false, event$, headers$, id$)
  );

  const uploaded$ = pdfId$.pipe(
    call(map, uploadBuilderForPdf, false, event$, pdfId$, headers$, id$)
  );

  const cloudId$ = uploaded$.pipe(
    call(map, closePdf, false, headers$, pdfId$)
  );

  const uploadDone$ = cloudId$.pipe(
    call(map, emitUploadDoneEvent, true, id$, event$)
  );

  // return cloudId$ instead of uploadDone$ but preserve observable chain
  return uploadDone$.pipe(concatMap(() => cloudId$));    
}

演示:https://stackblitz.com/edit/so-rxjs-chaining-4?file=index.ts

你当然不应该让你的方法接受与它们无关的参数!

关于您的主要问题:

How to pass results between chained observables without the problems i've mentioned?

使用单一作用域(嵌套管道)

下面的代码等同于您的示例代码,不需要传递不必要的属性。之前返回的值可以通过链下的函数调用访问:

1   startUploadEvent$.pipe(
2     concatMap(event => getAuthenticationHeaders(event).pipe(
3       map(headers => generateUploadId(event, headers).pipe(
4         tap(id => emitUploadStartEvent(id, event)),
5         concatMap(id => createPdfDocument(event, headers, id)),
6         concatMap(pdfId => uploadBilderForPdf(event, pdfId)),
7         tap(cloudId => closePdf(cloudId, event))
8       ))
9     ))
10  ).subscribe();

注意如何在下游访问 eventheaders。它们不需要传递到不需要它们的函数中。

Is there a rxjs concept i've missed?

也许。?不是真的......:-)

诀窍是添加 .pipe 以有效地对运算符进行分组,以便它们都可以访问输入参数。

通常,我们尽量让代码在 .pipe:

1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`)),
3     map(response => response.data.userName),
4     map(name => `Hello ${name}!`),
5     tap(greeting => console.log(greeting))
6   );

但该代码实际上与:

没有什么不同
1   const greeting$ = userId$.pipe(
2     switchMap(id => http.get(`/users/${id}`).pipe(
3       map(response => response.data.userName),
4       map(name => `Hello ${name}! (aka User #${id})`)
5     )),
6     tap(greeting => console.log(greeting))
7   );

但是,在第二种情况下,第 4 行可以访问 nameid,而在第一种情况下,它只能访问 name.

注意第一个签名是userId$.pipe(switchMap(), map(), map(), tap())

第二个是:userId$.pipe(switchMap(), tap()).

你提到的这些担忧和问题是对的,但我在这里看到的问题是将你的思维方式从命令式方法转变为 Reactive/Functional 方法,但让我们先回顾一下命令式代码

private startUpload(event: StartUploadEvent) {
    const headers = this.getAuthenticationHeaders(event)
    const id = this.generateUploadId()

    this.emitUploadStartEvent(id, event)

    const pdfId = this.createPdfDocument(event, headers, id)
    this.uploadBilderForPdf(event, pdfId, headers, id)

    const cloudId = this.closePdf(headers, pdfId)
    this.emitUploadDoneEvent(id, event, cloudId)

    return cloudId
}

在这里你看到的东西比你有 event 更干净,你可以传递并只获取你想要的并将它传递给下一个函数,我们想将这段代码移动到 Reactive/Functional方法。

从我的角度来看,主要问题是你让你的函数失去了它们所拥有的上下文,例如 getAuthenticationHeaders 根本不应该 return event 它应该只 return headers 其他函数同理。

在处理 RxJS(又名 Reactive Approach)时,您会经常处理这些问题,这没关系,因为它保留了函数式概念并使您的代码更具可预测性,因为 pure 运算符应该只处理同一管道中的数据,保持一切纯净,不会导致导致不可预测代码的副作用。

我想你要找的东西会用 nested pipes 来解决(这是我认为最好的解决方案)

concatMap(event => this.getAuthenticationHeaders(event).pipe(
    map(headers => this.generateUploadId(event, headers).pipe())
))

并且它在一些 RxJS 后端库中大量使用,例如 Marble.js

您可以使用类似于 Result Selector:

的方法
concatMap(event => this.getAuthenticationHeaders(event).pipe(
  map(headers => ({ headers, event }))
)),

或人们建议的其他很棒的解决方案将使它起作用,但您仍然会遇到与您提到的相同的问题,但需要更多 clean/readable 代码。

您也可以将其转为 async/await 方法,但您将失去 RxJS 提供给您的反应性。

我可以建议的是尝试阅读更多关于反应式编程的内容以及你如何将你的思维转移到它上,我将在此处提供一些链接,我认为这些链接非常适合作为开始并尝试一些库建立在 RxJS 之上,例如 CycleJS and I recommend to read about Functional Programming which will help a lot also from this great books Mostly adequate guide to FP (in javascript) & Composing Software.

我推荐这个精彩的讲座 RxJS Recipes,它将改变您使用 RxJS 的方式。

有用的资源: