使用 RailsEventStore 仅订阅单个流
Subscribe only to a single stream with RailsEventStore
我打算使用 RailsEventStore 将持久读取模型投射到关系数据库中。
为了实现这一点,我需要一个具有固定顺序且没有重复事件的输入流。因此,我构建了一个链接器来监听所有事件并将与我的投影相关的那些事件链接到一个单独的流中。
现在我想为给定的读取模型注册一个构建器作为该流的订阅者。但是我找不到订阅特定流的方法。
这可能吗?如果可以,怎么办?
无法订阅所选流中的事件(但这个想法似乎很有趣)。您需要实现您的构建器以定期从给定的流中读取和处理事件。它需要存储最后处理的事件 ID,下次运行时它应该只读取链接到流的新域事件。
(伪)代码可能如下所示:
class Builder
def initialize(event_store)
@event_store = event_store
@last_processed_event_id = read_last_processed_event
end
def call(stream)
events = read_all_since(stream, event_id)
process(events)
store_last_processed_event
end
private
def read_all_since(stream, id)
@event_store.read.stream(stream).from(id).to_a
end
def process(events)
events.each do |event|
#... do sth here
@last_processed_event_id = event.event_id
end
end
def load_last_processed_event
# read last processed event or return nil to start from beginning of stream
end
def store_last_processed_event
# store it somewhere
end
end
我打算使用 RailsEventStore 将持久读取模型投射到关系数据库中。
为了实现这一点,我需要一个具有固定顺序且没有重复事件的输入流。因此,我构建了一个链接器来监听所有事件并将与我的投影相关的那些事件链接到一个单独的流中。
现在我想为给定的读取模型注册一个构建器作为该流的订阅者。但是我找不到订阅特定流的方法。
这可能吗?如果可以,怎么办?
无法订阅所选流中的事件(但这个想法似乎很有趣)。您需要实现您的构建器以定期从给定的流中读取和处理事件。它需要存储最后处理的事件 ID,下次运行时它应该只读取链接到流的新域事件。
(伪)代码可能如下所示:
class Builder
def initialize(event_store)
@event_store = event_store
@last_processed_event_id = read_last_processed_event
end
def call(stream)
events = read_all_since(stream, event_id)
process(events)
store_last_processed_event
end
private
def read_all_since(stream, id)
@event_store.read.stream(stream).from(id).to_a
end
def process(events)
events.each do |event|
#... do sth here
@last_processed_event_id = event.event_id
end
end
def load_last_processed_event
# read last processed event or return nil to start from beginning of stream
end
def store_last_processed_event
# store it somewhere
end
end