Event Store 持久订阅如何运作?
How Event Store persistent subscription are working?
我目前正在做一个项目,我们使用事件存储作为事件的写入存储。追加事件效果很好,当我们想要监听这些事件时,问题就来了。
为了监听事件存储事件,我们使用官方事件存储Node.js客户端npm i @eventstore/db-client
。
我们确实在管理员 UI 中创建了一个永久订阅。要连接到此订阅,我们使用 eventStoreClient.connectToPersistentSubscription
并正确连接。
问题是我们的事件是为事件重播的。事实上,他们卡在了停放消息列表中。
重播事件是正常行为吗?
事件存储如何记住它提供给 Node.js 客户端的事件?
注意:事件当前正在播放,我们的投影以正确的方式构建,但事件将永远重播。
我们用来监听事件的代码
const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
bufferSize: 10,
}, {
})
for await (const e of stream) {
const { event, commitPosition, link } = e
// console.log(event, commitPosition, link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized, we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
await stream.ack(event.id)
// this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
} catch (error) {
await stream.nack(RETRY, error.message, event.id)
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
}
}
我们的事件存储日志
{"@t":"2021-06-23T15:51:48.3692381Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":1981,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:49.3743720Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746323Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746694Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:54.3993484Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:51:55.4072471Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:51:56.4114240Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:57.4147394Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:58.4250298Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4330048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4333048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4335573Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:04.4664585Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:05.4775447Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:06.4824377Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:07.4876752Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:08.4934409Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:09.5068553Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069364Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069701Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:14.5376026Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:15.5472390Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:16.5521045Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:17.5623183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:18.5650711Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:19.5781601Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782624Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782980Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:24.6146992Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:25.6217820Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:26.6296129Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:27.6418730Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:28.6438550Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:29.6514819Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515344Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515586Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:34.6962266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:35.6984264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:36.7013623Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:37.7038263Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:38.7111554Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7205755Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206490Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:44.7380921Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:45.7465243Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:46.7523226Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:47.7535593Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:48.7656883Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:49.7718183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718754Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718986Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:54.8063912Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:55.8160008Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:56.8185175Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:57.8277609Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:58.8294945Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:59.8368339Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369451Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:00.4461267Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":609,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:05.4787174Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:53:06.4825158Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:07.4891830Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:08.4988531Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:09.5058824Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:10.5174611Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175070Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175221Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:15.5455199Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:16.5574829Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:17.5670153Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:18.5712367Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:19.5807126Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:20.5878979Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5879796Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5880027Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
永久订阅配置
编辑:我们更改了脚本以确认 linkto 事件而不是链接事件,因为 $ce-precontrol 是一个 linkto 流。
我们的事件现在永远不会重试。
const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
bufferSize: 10,
}, {
})
for await (const e of stream) {
const { event, commitPosition, link } = e
// console.log(event, commitPosition, link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized, we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
await stream.ack(link?.id || '')
this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
} catch (error) {
await stream.nack(RETRY, error.message, link?.id || '')
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
}
}
我的事件永远重试。我正在收听由 link tos 组成的流。
我配置了持久订阅来解析 link tos,所以当我访问 event.id
时,我得到了 linked 事件 ID。
我使用这个 linked 事件 ID 来确认事件,而不是使用 link 事件 ID 调用确认。
因为我的事件没有确认,它们确实超时并重试,直到达到最大重试次数。
for await (const e of stream) {
const { event, commitPosition, link } = e
// console.log(event, commitPosition, link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized, we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
// Using the link to event id to ack
await stream.ack(link?.id || '')
this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
} catch (error) {
await stream.nack(RETRY, error.message, link?.id || '')
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
}
}
我目前正在做一个项目,我们使用事件存储作为事件的写入存储。追加事件效果很好,当我们想要监听这些事件时,问题就来了。
为了监听事件存储事件,我们使用官方事件存储Node.js客户端npm i @eventstore/db-client
。
我们确实在管理员 UI 中创建了一个永久订阅。要连接到此订阅,我们使用 eventStoreClient.connectToPersistentSubscription
并正确连接。
问题是我们的事件是为事件重播的。事实上,他们卡在了停放消息列表中。
重播事件是正常行为吗? 事件存储如何记住它提供给 Node.js 客户端的事件?
注意:事件当前正在播放,我们的投影以正确的方式构建,但事件将永远重播。
我们用来监听事件的代码
const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
bufferSize: 10,
}, {
})
for await (const e of stream) {
const { event, commitPosition, link } = e
// console.log(event, commitPosition, link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized, we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
await stream.ack(event.id)
// this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
} catch (error) {
await stream.nack(RETRY, error.message, event.id)
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
}
}
我们的事件存储日志
{"@t":"2021-06-23T15:51:48.3692381Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":1981,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:49.3743720Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746323Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746694Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:54.3993484Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:51:55.4072471Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:51:56.4114240Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:57.4147394Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:58.4250298Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4330048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4333048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4335573Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:04.4664585Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:05.4775447Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:06.4824377Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:07.4876752Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:08.4934409Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:09.5068553Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069364Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069701Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:14.5376026Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:15.5472390Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:16.5521045Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:17.5623183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:18.5650711Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:19.5781601Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782624Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782980Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:24.6146992Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:25.6217820Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:26.6296129Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:27.6418730Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:28.6438550Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:29.6514819Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515344Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515586Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:34.6962266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:35.6984264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:36.7013623Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:37.7038263Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:38.7111554Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7205755Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206490Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:44.7380921Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:45.7465243Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:46.7523226Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:47.7535593Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:48.7656883Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:49.7718183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718754Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718986Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:54.8063912Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:55.8160008Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:56.8185175Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:57.8277609Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:58.8294945Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:59.8368339Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369451Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:00.4461267Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":609,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:05.4787174Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:53:06.4825158Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:07.4891830Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:08.4988531Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:09.5058824Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:10.5174611Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175070Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175221Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:15.5455199Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:16.5574829Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:17.5670153Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:18.5712367Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:19.5807126Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:20.5878979Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5879796Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5880027Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
永久订阅配置
编辑:我们更改了脚本以确认 linkto 事件而不是链接事件,因为 $ce-precontrol 是一个 linkto 流。
我们的事件现在永远不会重试。
const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
bufferSize: 10,
}, {
})
for await (const e of stream) {
const { event, commitPosition, link } = e
// console.log(event, commitPosition, link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized, we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
await stream.ack(link?.id || '')
this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
} catch (error) {
await stream.nack(RETRY, error.message, link?.id || '')
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
}
}
我的事件永远重试。我正在收听由 link tos 组成的流。
我配置了持久订阅来解析 link tos,所以当我访问 event.id
时,我得到了 linked 事件 ID。
我使用这个 linked 事件 ID 来确认事件,而不是使用 link 事件 ID 调用确认。
因为我的事件没有确认,它们确实超时并重试,直到达到最大重试次数。
for await (const e of stream) {
const { event, commitPosition, link } = e
// console.log(event, commitPosition, link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized, we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
// Using the link to event id to ack
await stream.ack(link?.id || '')
this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
} catch (error) {
await stream.nack(RETRY, error.message, link?.id || '')
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
}
}