将所有事件从 CommandGateway 路由到单个事件处理程序
Route all events from CommandGateway to single event handler
我正在使用 AxonFramework 实现 JGroups,我指的是 this link。我对代码进行了一些更改,并且 运行 在没有 Docker 的情况下对项目进行了调整。以下是我的代码 -
主要Class-
public class ClusterRunner {
public static void main(String[] args) {
Thread t1 = new Thread(new PrimaryNode());
Thread t2 = new Thread(new SecondaryNode());
t1.start();
t2.start();
}
}
主节点-
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;
public class PrimaryNode implements Runnable {
private JGroupsConnector connector;
private CommandGateway commandGateway;
private EventStore eventStore;
private CommandBus commandBus;
public PrimaryNode() {
eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}
Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);
new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);
commandGateway = new DefaultCommandGateway(commandBus);
}
public void run() {
for (int a = 0; a < 5; a++) {
System.out.println("Primary Node Created item " + a + " id: " + System.currentTimeMillis());
commandGateway.sendAndWait(new CreateItem(Long.toString(a), Long.toString(System.currentTimeMillis())));
}
}
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
}
次节点-
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;
public class SecondaryNode implements Runnable {
private JGroupsConnector connector;
private EventStore eventStore;
public SecondaryNode() {
eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
CommandBus commandBus = null;
try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}
Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);
new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);
@SuppressWarnings("unused")
CommandGateway commandGateway = new DefaultCommandGateway(commandBus);
}
public void run() {
new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> {
System.out.println("Secondary Node -- " + event.getPayload());
}), eventStore).start();
}
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp_test.xml"));
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
}
项目-
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
class CreateItem {
@TargetAggregateIdentifier
private final String itemId;
private final String name;
public CreateItem(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}
public String getItemId() {
return itemId;
}
public String getName() {
return name;
}
}
class ItemCreated {
private final String itemId;
private final String name;
public ItemCreated(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}
public String getItemId() {
return itemId;
}
public String getName() {
return name;
}
@Override
public String toString() {
return itemId + " " + name;
}
}
class Item {
@AggregateIdentifier
private String itemId;
private String name;
public Item() {
}
@CommandHandler
public Item(CreateItem createItem) {
apply(new ItemCreated(createItem.getItemId(), createItem.getName()));
}
@EventHandler
public void itemCreated(ItemCreated itemCreated) {
itemId = itemCreated.getItemId();
name = itemCreated.getName();
}
}
现在我的问题是,当我 运行 main class 时,主节点产生 5 个事件,但辅助节点没有获取所有事件。它可能会收到 2 或 3 或 4 个事件,但不是全部。我希望所有事件都传递到辅助节点。我对 AxonFramework 和 JGroups 很陌生。请帮助我了解这里的问题是什么。
所以在尝试了一切之后,我决定尝试路由策略。我决定使用 AbstractRoutingStrategy,它基本上有助于对没有决定性目的地的命令消息进行决策。以下是 JGroup 的主节点(发件人)中的工作代码。将 PrimaryNode class 的 configureDistributedCommandBus() 方法修改为 -
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));
RoutingStrategy rs = new AbstractRoutingStrategy(UnresolvedRoutingKeyPolicy.STATIC_KEY) {
@Override
protected String doResolveRoutingKey(CommandMessage<?> cmdMsg) {
View view = channel.getView();
if (view.getMembers().size() == 2) {
return "secondary";
} else if (view.getMembers().size() == 1) {
}
return cmdMsg.getIdentifier();
}
};
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), rs);
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
由于我使用的是 JGroups,所以我可以查看集群,即有多少个节点。在此基础上,我将决定命令消息路由。
默认情况下,Axon 会将您的每个事件处理程序订阅到事件总线(在您的例子中是 EmbeddedEventStore)。这意味着当特定的本地实例发布事件时调用处理程序。并且该事件在处理命令时发布。所以本质上,事件处理程序是在处理命令的节点上调用的。
或者,您可以将事件处理程序配置为 运行 "tracking" 模式。在这种情况下,他们将打开与事件存储的连接。在那种情况下,根据具体的配置,每个节点都可以获取自己的事件副本,而不管它是在哪里发布的。
我正在使用 AxonFramework 实现 JGroups,我指的是 this link。我对代码进行了一些更改,并且 运行 在没有 Docker 的情况下对项目进行了调整。以下是我的代码 -
主要Class-
public class ClusterRunner {
public static void main(String[] args) {
Thread t1 = new Thread(new PrimaryNode());
Thread t2 = new Thread(new SecondaryNode());
t1.start();
t2.start();
}
}
主节点-
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;
public class PrimaryNode implements Runnable {
private JGroupsConnector connector;
private CommandGateway commandGateway;
private EventStore eventStore;
private CommandBus commandBus;
public PrimaryNode() {
eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}
Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);
new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);
commandGateway = new DefaultCommandGateway(commandBus);
}
public void run() {
for (int a = 0; a < 5; a++) {
System.out.println("Primary Node Created item " + a + " id: " + System.currentTimeMillis());
commandGateway.sendAndWait(new CreateItem(Long.toString(a), Long.toString(System.currentTimeMillis())));
}
}
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
}
次节点-
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.distributed.commandfilter.AcceptAll;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.jgroups.commandhandling.JGroupsConnector;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.jgroups.JChannel;
public class SecondaryNode implements Runnable {
private JGroupsConnector connector;
private EventStore eventStore;
public SecondaryNode() {
eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());
CommandBus commandBus = null;
try {
commandBus = configureDistributedCommandBus();
} catch (Exception e) {
e.printStackTrace();
}
Repository<Item> repository = new EventSourcingRepository<>(Item.class, eventStore);
new AggregateAnnotationCommandHandler<>(Item.class, repository).subscribe(commandBus);
@SuppressWarnings("unused")
CommandGateway commandGateway = new DefaultCommandGateway(commandBus);
}
public void run() {
new SubscribingEventProcessor("processor", new SimpleEventHandlerInvoker((EventListener) event -> {
System.out.println("Secondary Node -- " + event.getPayload());
}), eventStore).start();
}
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
JChannel channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp_test.xml"));
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(),
new AnnotationRoutingStrategy());
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
}
项目-
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.eventhandling.EventHandler;
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
class CreateItem {
@TargetAggregateIdentifier
private final String itemId;
private final String name;
public CreateItem(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}
public String getItemId() {
return itemId;
}
public String getName() {
return name;
}
}
class ItemCreated {
private final String itemId;
private final String name;
public ItemCreated(String itemId, String naam) {
this.itemId = itemId;
this.name = naam;
}
public String getItemId() {
return itemId;
}
public String getName() {
return name;
}
@Override
public String toString() {
return itemId + " " + name;
}
}
class Item {
@AggregateIdentifier
private String itemId;
private String name;
public Item() {
}
@CommandHandler
public Item(CreateItem createItem) {
apply(new ItemCreated(createItem.getItemId(), createItem.getName()));
}
@EventHandler
public void itemCreated(ItemCreated itemCreated) {
itemId = itemCreated.getItemId();
name = itemCreated.getName();
}
}
现在我的问题是,当我 运行 main class 时,主节点产生 5 个事件,但辅助节点没有获取所有事件。它可能会收到 2 或 3 或 4 个事件,但不是全部。我希望所有事件都传递到辅助节点。我对 AxonFramework 和 JGroups 很陌生。请帮助我了解这里的问题是什么。
所以在尝试了一切之后,我决定尝试路由策略。我决定使用 AbstractRoutingStrategy,它基本上有助于对没有决定性目的地的命令消息进行决策。以下是 JGroup 的主节点(发件人)中的工作代码。将 PrimaryNode class 的 configureDistributedCommandBus() 方法修改为 -
private CommandBus configureDistributedCommandBus() throws Exception {
CommandBus commandBus = new SimpleCommandBus();
channel = new JChannel(getClass().getClassLoader().getResourceAsStream("tcp.xml"));
RoutingStrategy rs = new AbstractRoutingStrategy(UnresolvedRoutingKeyPolicy.STATIC_KEY) {
@Override
protected String doResolveRoutingKey(CommandMessage<?> cmdMsg) {
View view = channel.getView();
if (view.getMembers().size() == 2) {
return "secondary";
} else if (view.getMembers().size() == 1) {
}
return cmdMsg.getIdentifier();
}
};
connector = new JGroupsConnector(commandBus, channel, "axon-jgroups-demo", new XStreamSerializer(), rs);
connector.updateMembership(100, AcceptAll.INSTANCE);
connector.connect();
connector.awaitJoined();
return new DistributedCommandBus(connector, connector);
}
由于我使用的是 JGroups,所以我可以查看集群,即有多少个节点。在此基础上,我将决定命令消息路由。
默认情况下,Axon 会将您的每个事件处理程序订阅到事件总线(在您的例子中是 EmbeddedEventStore)。这意味着当特定的本地实例发布事件时调用处理程序。并且该事件在处理命令时发布。所以本质上,事件处理程序是在处理命令的节点上调用的。
或者,您可以将事件处理程序配置为 运行 "tracking" 模式。在这种情况下,他们将打开与事件存储的连接。在那种情况下,根据具体的配置,每个节点都可以获取自己的事件副本,而不管它是在哪里发布的。