如何 use/consume 来自 wolkenkit-eventstore 的事件流

How to use/consume an event stream from wolkenkit-eventstore

我想使用 wolkenkit 的 eventstore 并试图建立一个快速示例。但是我无法简单地输出事件流。

简化示例:

const eventstore = require("wolkenkit-eventstore/inmemory");
const Stream = require("stream");
const uuidv4 = require("uuid/v4");
const Event = require("commands-events/dist/Event");

const main = async () => {
    await eventstore.initialize();

    const aggregateId = uuidv4();
    const event = new Event({ ... });
    event.metadata.revision = 1;

    await eventstore.saveEvents({ events: event });

    const writableStream = new Stream.Writable();
    writableStream._write = (chunk, encoding, next) => {
        console.log(chunk.toString());
                next()
    };

    const readableStream = eventstore.getUnpublishedEventStream();
    readableStream.pipe(writableStream);
};

main();

据我了解,getUnpublishedEventStream returns 一个可读流。我按照 this 的说明进行操作,但没有按预期工作。 我得到的只是以下错误:

(node:10988) UnhandledPromiseRejectionWarning: TypeError: readableStream.pipe is not a function

根据 documentation of wolkenkit-eventstoregetUnpublishedEventStream 是一个 async 函数,即您必须用 await 调用它。否则,你不会得到一个流,而是一个承诺(并且一个承诺没有 pipe 函数)。

所以,这一行

const readableStream = eventstore.getUnpublishedEventStream();

应该是:

const readableStream = await eventstore.getUnpublishedEventStream();

除此之外我没有仔细查看您的代码,但这就是您收到当前错误消息的原因。

PS:请注意,我是wolkenkit的核心开发人员之一,所以请对我的回答持保留态度。