如何在 `filter` 中使用可观察对象?
How to use an observable within `filter`?
假设我有一个类似于以下的目录结构:
foo
|
+---one
| +---tmp
|
+---two
|
+---three
|
+---tmp
我想获取 foo
下的子目录列表,其中有一个 tmp
(子)子目录。我确实有以下代码可以做到这一点:
const { bindNodeCallback, from } = require('rxjs');
const { map, flatMap, filter } = require('rxjs/operators');
const { readdir, access, accessSync, constants: { F_OK, R_OK }} = require('fs');
const { join } = require('path');
const readdirRx = bindNodeCallback(readdir);
const FOO = './foo';
const result = readdirRx(FOO)
.pipe(
// readdir will return an array of files. Convert each
// file into an observable, and then flatten it.
flatMap(from),
// get rid of directories that do not have a `tmp` dir underneath
filter(dir => {
try {
accessSync(join(FOO, dir, 'tmp'), F_OK | R_OK);
return true;
} catch (err) {
return false;
}
})
);
result.subscribe(console.log, console.error, () => console.log('Done!'));
// outputs (correctly):
// one
// three
// Done!
但是,这段代码对我来说看起来很糟糕,因为 (a) 我正在使用控制流异常,以及 (b) 有一个同步 accessSync
调用,这两者都会对性能产生不利影响。
我确实有以下响应式代码来检查同样的事情:
const fileExistsAndReadableRx = bindNodeCallback(
// check if file exists and readable
(file, cb) => access(file, F_OK | R_OK,
// call the callback with true if no errors found
err => cb(!err)
)
);
但是,我不知道如何将 fileExistsAndReadableRx
插入到上面的程序中。我的 objective 是删除 try-catch 块并使用 fileExistsAndReadableRx
来过滤掉没有 tmp
(子)子目录的子目录。我该怎么做?
(请注意,我尝试做的实际任务不是读取磁盘。我尝试做的是一个复杂的异步操作,我不得不想出一个更简单的例子来说明我的问题) .
到目前为止我尝试过的:
我尝试使用 map
,但它发出了一系列 Observables,正如人们所期望的那样:
const result = readdirRx(FOO)
.pipe(
flatMap(from),
map(dir =>
fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
)
);
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }
// Done!
所以我想,我知道了!我会使用 flatMap
来展平 Observables。这应该产生三个布尔值,最终从那些 Observables 发出。但这也不起作用。它只发出一个值:
const result = readdirRx(FOO)
.pipe(
flatMap(from),
flatMap(dir =>
fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
)
);
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
// true
// Done!
编辑:
我试过 @IngoBurk's ,但它产生了一个布尔值。不是我期望的字符串列表:
const result = readdirRx(FOO)
.pipe(
flatMap(from),
flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
.pipe(
filter(Boolean),
map(() => dir),
)
),
);
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
// true
// Done!
你可以这样做:
readdirRx(FOO)
.pipe(
flatMap(from),
flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
.pipe(
catchError(res => of(res)),
filter(Boolean),
map(() => dir),
)
),
)
假设fileExistsAndReadableRx
returns Observable<boolean>
。 filter(Boolean)
只是 filter(v => !!v)
.
的缩写
这里的技巧本质上是您尝试过的,即将每个目录(平面)映射到可观察到的 tmp 文件夹检查。然后我们使用该结果过滤掉那些不匹配的结果,然后将其映射回目录而不是那个中间结果。
您可以在此处查看实际效果:https://rxviz.com/v/d8djbkRO
you'd probably want to use cb(null, !err)
to invoke the callback in fileExistsAndReadableRx
假设我有一个类似于以下的目录结构:
foo
|
+---one
| +---tmp
|
+---two
|
+---three
|
+---tmp
我想获取 foo
下的子目录列表,其中有一个 tmp
(子)子目录。我确实有以下代码可以做到这一点:
const { bindNodeCallback, from } = require('rxjs');
const { map, flatMap, filter } = require('rxjs/operators');
const { readdir, access, accessSync, constants: { F_OK, R_OK }} = require('fs');
const { join } = require('path');
const readdirRx = bindNodeCallback(readdir);
const FOO = './foo';
const result = readdirRx(FOO)
.pipe(
// readdir will return an array of files. Convert each
// file into an observable, and then flatten it.
flatMap(from),
// get rid of directories that do not have a `tmp` dir underneath
filter(dir => {
try {
accessSync(join(FOO, dir, 'tmp'), F_OK | R_OK);
return true;
} catch (err) {
return false;
}
})
);
result.subscribe(console.log, console.error, () => console.log('Done!'));
// outputs (correctly):
// one
// three
// Done!
但是,这段代码对我来说看起来很糟糕,因为 (a) 我正在使用控制流异常,以及 (b) 有一个同步 accessSync
调用,这两者都会对性能产生不利影响。
我确实有以下响应式代码来检查同样的事情:
const fileExistsAndReadableRx = bindNodeCallback(
// check if file exists and readable
(file, cb) => access(file, F_OK | R_OK,
// call the callback with true if no errors found
err => cb(!err)
)
);
但是,我不知道如何将 fileExistsAndReadableRx
插入到上面的程序中。我的 objective 是删除 try-catch 块并使用 fileExistsAndReadableRx
来过滤掉没有 tmp
(子)子目录的子目录。我该怎么做?
(请注意,我尝试做的实际任务不是读取磁盘。我尝试做的是一个复杂的异步操作,我不得不想出一个更简单的例子来说明我的问题) .
到目前为止我尝试过的:
我尝试使用 map
,但它发出了一系列 Observables,正如人们所期望的那样:
const result = readdirRx(FOO)
.pipe(
flatMap(from),
map(dir =>
fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
)
);
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }
// Done!
所以我想,我知道了!我会使用 flatMap
来展平 Observables。这应该产生三个布尔值,最终从那些 Observables 发出。但这也不起作用。它只发出一个值:
const result = readdirRx(FOO)
.pipe(
flatMap(from),
flatMap(dir =>
fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
)
);
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
// true
// Done!
编辑:
我试过 @IngoBurk's
const result = readdirRx(FOO)
.pipe(
flatMap(from),
flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
.pipe(
filter(Boolean),
map(() => dir),
)
),
);
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
// true
// Done!
你可以这样做:
readdirRx(FOO)
.pipe(
flatMap(from),
flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
.pipe(
catchError(res => of(res)),
filter(Boolean),
map(() => dir),
)
),
)
假设fileExistsAndReadableRx
returns Observable<boolean>
。 filter(Boolean)
只是 filter(v => !!v)
.
这里的技巧本质上是您尝试过的,即将每个目录(平面)映射到可观察到的 tmp 文件夹检查。然后我们使用该结果过滤掉那些不匹配的结果,然后将其映射回目录而不是那个中间结果。
您可以在此处查看实际效果:https://rxviz.com/v/d8djbkRO
you'd probably want to use
cb(null, !err)
to invoke the callback infileExistsAndReadableRx