如何 use/consume 来自 wolkenkit-eventstore 的事件流
How to use/consume an event stream from wolkenkit-eventstore
我想使用 wolkenkit 的 eventstore 并试图建立一个快速示例。但是我无法简单地输出事件流。
简化示例:
const eventstore = require("wolkenkit-eventstore/inmemory");
const Stream = require("stream");
const uuidv4 = require("uuid/v4");
const Event = require("commands-events/dist/Event");
const main = async () => {
await eventstore.initialize();
const aggregateId = uuidv4();
const event = new Event({ ... });
event.metadata.revision = 1;
await eventstore.saveEvents({ events: event });
const writableStream = new Stream.Writable();
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString());
next()
};
const readableStream = eventstore.getUnpublishedEventStream();
readableStream.pipe(writableStream);
};
main();
据我了解,getUnpublishedEventStream returns 一个可读流。我按照 this 的说明进行操作,但没有按预期工作。
我得到的只是以下错误:
(node:10988) UnhandledPromiseRejectionWarning: TypeError: readableStream.pipe is not a function
根据 documentation of wolkenkit-eventstore,getUnpublishedEventStream
是一个 async
函数,即您必须用 await
调用它。否则,你不会得到一个流,而是一个承诺(并且一个承诺没有 pipe
函数)。
所以,这一行
const readableStream = eventstore.getUnpublishedEventStream();
应该是:
const readableStream = await eventstore.getUnpublishedEventStream();
除此之外我没有仔细查看您的代码,但这就是您收到当前错误消息的原因。
PS:请注意,我是wolkenkit的核心开发人员之一,所以请对我的回答持保留态度。
我想使用 wolkenkit 的 eventstore 并试图建立一个快速示例。但是我无法简单地输出事件流。
简化示例:
const eventstore = require("wolkenkit-eventstore/inmemory");
const Stream = require("stream");
const uuidv4 = require("uuid/v4");
const Event = require("commands-events/dist/Event");
const main = async () => {
await eventstore.initialize();
const aggregateId = uuidv4();
const event = new Event({ ... });
event.metadata.revision = 1;
await eventstore.saveEvents({ events: event });
const writableStream = new Stream.Writable();
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString());
next()
};
const readableStream = eventstore.getUnpublishedEventStream();
readableStream.pipe(writableStream);
};
main();
据我了解,getUnpublishedEventStream returns 一个可读流。我按照 this 的说明进行操作,但没有按预期工作。 我得到的只是以下错误:
(node:10988) UnhandledPromiseRejectionWarning: TypeError: readableStream.pipe is not a function
根据 documentation of wolkenkit-eventstore,getUnpublishedEventStream
是一个 async
函数,即您必须用 await
调用它。否则,你不会得到一个流,而是一个承诺(并且一个承诺没有 pipe
函数)。
所以,这一行
const readableStream = eventstore.getUnpublishedEventStream();
应该是:
const readableStream = await eventstore.getUnpublishedEventStream();
除此之外我没有仔细查看您的代码,但这就是您收到当前错误消息的原因。
PS:请注意,我是wolkenkit的核心开发人员之一,所以请对我的回答持保留态度。