如果我在调用异步函数后调用它,为什么我的 Observer 订阅者不是 运行?

Why is my Observer subscriber not running if I call it after I call an async function?

我有一些代码如下所示:

async function promptHandler(source) {
    source.subscribe(function(line) {
      console.log(`line == ${line}`);
    });
    let matchingTests = await getMatchingTests('ROGL');
}

这会打印出 source Observable 的内容,它正在侦听 ReadStreamtxt 文件。当调用上面的函数时,我看到了文件的输出。但是,如果我在调用 subscribe() 之后调用 getMatchingTests(),如下所示:

async function promptHandler(source) {
    let matchingTests = await getMatchingTests('ROGL');
    source.subscribe(function(line) {
      console.log(`line == ${line}`);
    });
}

我没有看到 txt 文件的内容。我知道 matchingTests 变量包含 getMatchingTests 的成功结果,所以我认为它不会阻止 Node 执行该行。

我猜测关于 getMatchingTests 异步函数调用的某些东西正在扰乱 source Observable,但我没有看到如何。

这是我的 source 代码:

let fileStream = createReadStream(file)
    .pipe(split());

let source = new Observable(o => {
   fileStream.on('data', line => {console.log('data'); o.next(line);});
   fileStream.on('error', err => o.error(err));
   fileStream.on('end', () => {console.log('end'); o.complete();});
});

我的直觉是源 observable 是一个热源,当 await 返回匹配测试时,您的文本文件已经被读取。所以当你在那个时候订阅时,没有一行可读,它们在你订阅源之前就被阅读了。

更新: 给定你的代码,如果顺序对你的用例来说是个问题,你可以考虑将文件流创建移动到可观察工厂中,即

let source = new Observable(o => {
let fileStream = createReadStream(file)
    .pipe(split());
   fileStream.on('data', line => {console.log('data'); o.next(line);});
   fileStream.on('error', err => o.error(err));
   fileStream.on('end', () => {console.log('end'); o.complete();});
});

这样,只有在您订阅时才会创建和启动流。