如何在对象方法上使用 fromEvent 而不是 bindCallback?

How to use fromEvent instead of bindCallback on an object method?

我需要将 rn-fetch-blob 的 onData 回调方法连接到一个可观察对象。

据我所知,这不是一个事件,fromEventPattern无法使用。 如果这是我的问题的解决方案,我看不出如何使用 create

我发现 bindCallback 看起来很有希望,但文档说我可能应该改用 fromEvent:

Note that the Observable created by the output function will always emit a single value and then complete immediately. If func calls the callback multiple times, values from subsequent calls will not appear in the stream. If you need to listen for multiple calls, you probably want to use fromEvent or fromEventPattern instead.

我确实需要听多个电话。

无论如何,我正在尝试在文档中所示的对象方法上使用 bindCallback。 在我的 Typescript 文件中:

import { bindCallback } from 'rxjs';

在我的 class:

private emitter!: Observable<any>;

在私有方法中:

RNFetchBlob.fs
    .readStream(
      filePath,
      "utf8",
      -1,
      10
    )
    .then(ifstream => {
      ifstream.open();
      this.emitter = bindCallback(ifstream.onData);

但是编译失败:

error TS2322: Type '() => Observable<string | number[]>' is not assignable to type 'Observable<any>'.
  Property '_isScalar' is missing in type '() => Observable<string | number[]>'.

我真的看不出如何在我的案例中使用 fromEvent。

感谢任何帮助。

编辑:为寻找答案的人添加了工作代码:

RNFetchBlob.fs
            .readStream(
              // file path
              peripheral.name,
              // encoding, should be one of `base64`, `utf8`, `ascii`
              "utf8",
              // (optional) buffer size, default to 4096 (4095 for BASE64 encoded data)
              // when reading file in BASE64 encoding, buffer size must be multiples of 3.
              -1,
              10
            )
            .then(ifstream => {
              ifstream.open();
              this.emitter = new Observable(subscriber => {
                ifstream.onData(chunk => {
                  // chunk will be a string when encoding is 'utf8'
                  logging.logWithTimestamp(`Received [${chunk}]`);
                  subscriber.next(chunk);
                });
                ifstream.onError(err => {
                  logging.logWithTimestamp(`oops [${err}]`);
                  subscriber.error(err);
                });

                ifstream.onEnd(() => {
                  subscriber.complete();
                });
              });
              this.rxSubscription = this.emitter
                .pipe(
                  concatMap(value =>
                    this.handleUpdatedValuesComingFromCSVFile(value)
                  )
                )
                .subscribe();

以下是使用 fromEventPattern 的方法:

this.emitter = fromEventPattern(
  // tell fromEventPattern how to subscribe to the data
  handler => ifstream.onData(handler),
  // tell fromEventPattern how to unsubscribe from the data
  handler => ifstream.onData(null)
)

不太熟悉 rn-fetch-blob 但希望您能理解,同时您 return 一个 运行 清理逻辑的函数。

 const onDataObserevable=new Obserevable(obs=>{
    ifstream.onData(data=>obs.next(data))
    return ()=>{... you can add some clean up logic here like unsubcribe from source onData event}
 });

UPDATE

将整个链条转换为可观察的,希望你明白了

from(RNFetchBlob.fs.readStream(
              peripheral.name,
              "utf8",
              -1,
              10
            ))
            .pipe(mergeMap(ifstream => {
              ifstream.open();
              return new Observable(subscriber => {
                ifstream.onData(chunk => {
                  // chunk will be a string when encoding is 'utf8'
                  logging.logWithTimestamp(`Received [${chunk}]`);
                  subscriber.next(chunk);
                });

                ifstream.onError(err => {
                  logging.logWithTimestamp(`oops [${err}]`);
                  subscriber.error(err);
                });

                ifstream.onEnd(() => {
                  subscriber.complete();
                });

              });),
              mergeMap(value =>this.handleUpdatedValuesComingFromCSVFile(value)
                  )
              )