如何在 `filter` 中使用可观察对象?

How to use an observable within `filter`?

假设我有一个类似于以下的目录结构:

foo
|
+---one
|   +---tmp
|
+---two
|
+---three
    |
    +---tmp

我想获取 foo 下的子目录列表,其中有一个 tmp(子)子目录。我确实有以下代码可以做到这一点:

const { bindNodeCallback, from } = require('rxjs');
const { map, flatMap, filter } = require('rxjs/operators');
const { readdir, access, accessSync, constants: { F_OK, R_OK }} = require('fs');
const { join } = require('path');

const readdirRx = bindNodeCallback(readdir);

const FOO = './foo';

const result = readdirRx(FOO)
    .pipe(
        // readdir will return an array of files. Convert each
        // file into an observable, and then flatten it.
        flatMap(from),

        // get rid of directories that do not have a `tmp` dir underneath
        filter(dir => {
            try {
                accessSync(join(FOO, dir, 'tmp'), F_OK | R_OK);
                return true;
            } catch (err) {
                return false;
            }
        })
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// outputs (correctly):
//   one
//   three
//   Done!

但是,这段代码对我来说看起来很糟糕,因为 (a) 我正在使用控制流异常,以及 (b) 有一个同步 accessSync 调用,这两者都会对性能产生不利影响。

我确实有以下响应式代码来检查同样的事情:

const fileExistsAndReadableRx = bindNodeCallback(
    // check if file exists and readable
    (file, cb) => access(file, F_OK | R_OK,
        // call the callback with true if no errors found
        err => cb(!err)
    )
);

但是,我不知道如何将 fileExistsAndReadableRx 插入到上面的程序中。我的 objective 是删除 try-catch 块并使用 fileExistsAndReadableRx 来过滤掉没有 tmp(子)子目录的子目录。我该怎么做?

(请注意,我尝试做的实际任务不是读取磁盘。我尝试做的是一个复杂的异步操作,我不得不想出一个更简单的例子来说明我的问题) .

到目前为止我尝试过的:

我尝试使用 map,但它发出了一系列 Observables,正如人们所期望的那样:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        map(dir =>
            fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
        )
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Done!

所以我想,我知道了!我会使用 flatMap 来展平 Observables。这应该产生三个布尔值,最终从那些 Observables 发出。但这也不起作用。它只发出一个值:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        flatMap(dir =>
            fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
        )
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   true
//   Done!

编辑:

我试过 @IngoBurk's ,但它产生了一个布尔值。不是我期望的字符串列表:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
            .pipe(
                filter(Boolean),
                map(() => dir),
            )
        ),
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   true
//   Done!

你可以这样做:

readdirRx(FOO)
  .pipe(
    flatMap(from),
    flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
      .pipe(
        catchError(res => of(res)),
        filter(Boolean),
        map(() => dir),
      )
    ),
  )

假设fileExistsAndReadableRx returns Observable<boolean>filter(Boolean) 只是 filter(v => !!v).

的缩写

这里的技巧本质上是您尝试过的,即将每个目录(平面)映射到可观察到的 tmp 文件夹检查。然后我们使用该结果过滤掉那些不匹配的结果,然后将其映射回目录而不是那个中间结果。

您可以在此处查看实际效果:https://rxviz.com/v/d8djbkRO

you'd probably want to use cb(null, !err) to invoke the callback in fileExistsAndReadableRx