如何在 Vert.x 中配置 Axon 4 上下文
How to configure Axon 4 context in Vert.x
我正在使用 Axon 在我的 Vert.X 微服务中实施 CQRS/Event 采购。
在我的 Verticle 的 bootstrap 中,我有一个 createInfra 方法来创建我的 Axon 上下文。
当我尝试从 ny projection 获取资源时,我没有结果并且请求没有结束地执行。当我检查 QueryGateway 时,在 SimpleGatewayBus 中我没有订阅。
是否有人可以帮助我修复我的 Axon 配置?我在 MongoDB Eventstore 配置方面遇到了问题。
垂直
package com.omb..restadapter;
import com.omb..domain.commands.MyAggregate;
import com.omb..infra.repositories.MongoAggregateProjector;
import com.omb..infra.repositories.MongoAggregateRepository;
import com.omb..restadapter.handler.MyCommandHandler;
import com.omb..restadapter.handler.MyQueryHandler;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.auth.PubSecKeyOptions;
import io.vertx.ext.auth.jwt.JWTAuth;
import io.vertx.ext.auth.jwt.JWTAuthOptions;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.handler.JWTAuthHandler;
import io.vertx.ext.web.handler.StaticHandler;
import org.axonframework.config.AggregateConfigurer;
import org.axonframework.config.Configuration;
import org.axonframework.config.Configurer;
import org.axonframework.config.DefaultConfigurer;
import org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.extensions.mongo.eventsourcing.tokenstore.MongoTokenStore;
import java.util.logging.Logger;
public class MyVerticle extends AbstractVerticle {
private MyCommandHandler MyCommandHandler;
private MyQueryHandler MyQueryHandler;
private static final String APP_JSON = "application/json";
private static final Logger log = Logger.getLogger(MyVerticle.class.getName());
public static final String RESOURCE_NAME = "My";
/**
* Convenience method so you can run it in your IDE
*
* @param args
*/
public static void main(String[] args) {
Runner.runExample(MyVerticle.class);
}
/**
* Start the verticle
*/
@Override
public void start(Promise<Void> startFuture) throws Exception {
Future<Void> steps = createInfra().compose(t -> createRouter());
steps.onComplete(ar -> {
if(ar.succeeded()) {
startFuture.complete();
} else {
log.throwing(MyVerticle.class.getName(), "start", ar.cause());
startFuture.fail(ar.cause());
}
});
}
/**
* Stop the Verticle
*/
@Override
public void stop() throws Exception {
super.stop();
}
/**
* Create the server
*
* @return
*/
private Future<Void> createRouter() {
return Future.future(promise -> {
final Router router = Router.router(vertx);
final JWTAuth authProvider = JWTAuth.create(vertx, getJWTAuthOptions());
// allow route for CORS
router.route()
.handler(CorsHandler.create(".*.").allowedMethod(io.vertx.core.http.HttpMethod.GET)
.allowedMethod(io.vertx.core.http.HttpMethod.POST)
.allowedMethod(io.vertx.core.http.HttpMethod.DELETE)
.allowedMethod(io.vertx.core.http.HttpMethod.PUT).allowCredentials(true)
.allowedHeader("Access-Control-Allow-Method").allowedHeader("Authorization")
.allowedHeader("idPartner").allowedHeader("Access-Control-Allow-Origin")
.allowedHeader("Access-Control-Allow-Credentials").allowedHeader("Content-Type"));
router.route().handler(BodyHandler.create()).failureHandler(new FailureHandler());
router.get("/my-reources").produces(APP_JSON).handler(MyQueryHandler::getAllResource);
router.post("/my-resources").consumes(APP_JSON).produces(APP_JSON).handler(MyCommandHandler::createMy);
// Healthcheck
router.get("/health*").handler(HealthCheckHandler.create(vertx).register("health", res -> res.complete(Status.OK())));
// Swagger
router.route("/*").handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router).listen(8083);
promise.complete();
});
}
private Future<Void> createInfra() {
return Future.future(promise -> {
MongoClient mongoClient = new MongoClient(config().getString(ConfigResource.CONFIG_MONGODB_URL));
MongoDatabase database = mongoClient.getDatabase("resource");
MongoAggregateRepository repository = new MongoMyRepository(database);
MongoAggregateProjector MyProjector = new MongoAggregateProjector(repository);
Configuration configuration = DefaultConfigurer
.defaultConfiguration()
.configureAggregate(MyAggregate.class)
.eventProcessing(conf -> conf.registerTokenStore(config -> MongoTokenStore.builder().build()))
.registerEventHandler(conf -> MyProjector)
.registerQueryHandler(conf -> MyProjector)
.buildConfiguration();
// Write
MyCommandHandler = new MyCommandHandler(configuration.commandGateway());
// Read
MyQueryHandler = new MyQueryHandler(configuration.queryGateway());
promise.complete();
});
}
}
投影仪
package com.omb..infra.repositories;
import com.omb..domain.events.AggregateCreatedEvent;
import com.omb..domain.events.AggregateUpdatedEvent;
import com.omb..domain.queries.FindAggregateQuery;
import com.omb..domain.queries.FindAggregatesQuery;
import com.omb..domain.queries.IAggregateProjector;
import com.omb..domain.queries.AggregateView;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class MongoAggregateProjector implements IAggregateProjector {
private MongoAggregateRepository repository;
public MongoAggregateProjector(MongoAggregateRepository repository) {
this.repository = repository;
}
@Override
@EventHandler
public void on(AggregateCreatedEvent event) {
AggregateDocument AggregateDocument = new AggregateDocument(event.getId(), event.getName());
repository.addAggregate(AggregateDocument);
}
@Override
@EventHandler
public void on(AggregateUpdatedEvent event) {
AggregateDocument AggregateDocument = new AggregateDocument(event.getId(), event.getName());
repository.updateAggregate(AggregateDocument);
}
@Override
@QueryHandler
public Optional<AggregateView> handle(FindAggregateQuery query) {
return repository.getAggregate(query.getAggregateId()).map(AggregateDocument::toView);
}
@Override
@QueryHandler
public List<AggregateView> handle(FindAggregatesQuery query) {
return repository.getAggregates().stream().map(AggregateDocument::toView).collect(Collectors.toList());
}
}
汇总
package com.omb..domain.commands;
import com.omb..domain.events.AggregateCreatedEvent;
import com.omb..domain.events.AggregateUpdatedEvent;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
public class MyAggregate {
@AggregateIdentifier
private String myAggregateId;
private String name;
public Aggregate() {}
@CommandHandler
public Aggregate(CreateAggregateCommand command) {
apply(new AggregateCreatedEvent(command.getId(), command.getName()));
}
@CommandHandler
public void handle(UpdateAggregateCommand command) {
apply(new AggregateUpdatedEvent(command.getId(), command.getName()));
}
@EventSourcingHandler
public void on(AggregateCreatedEvent event) {
AggregateId = event.getId();
name = event.getName();
}
@EventSourcingHandler
public void on(AggregateUpdatedEvent event) {
AggregateId = event.getId();
name = event.getName();
}
}
我发现配置中有 2 个问题:
您只是“构建”了配置,但没有启动它。在 buildConfiguration()
之后,确保对返回的 Configuration 实例调用 'start()'。或者,直接在配置器上调用 start()
。它 returns 一个启动的配置实例。
这应该可以解决未通过的注册问题。但是很可能会触发下一个问题相关的异常....
您的 MongoTokenStore 配置不完整。 TokenStore 至少需要一个序列化程序和一个 MongoTemplate
实例。后者告诉 Axon 您想要哪些集合中的某些类型的信息。在您的情况下,只有 TrackingTokenCollection
是相关的。
config -> MongoTokenStore.builder()
.mongoTemplate(
DefaultMongoTemplate.builder()
// optionally choose collection names here
.mongoDatabase(mongoClient)
.build())
.serializer(Configuration::serializer)
.build()
我建议检查构建器中的选项,以确保您不需要配置特定于您的环境的任何其他内容。默认情况下适用于一般情况,但可能对您无效。
我正在使用 Axon 在我的 Vert.X 微服务中实施 CQRS/Event 采购。 在我的 Verticle 的 bootstrap 中,我有一个 createInfra 方法来创建我的 Axon 上下文。 当我尝试从 ny projection 获取资源时,我没有结果并且请求没有结束地执行。当我检查 QueryGateway 时,在 SimpleGatewayBus 中我没有订阅。
是否有人可以帮助我修复我的 Axon 配置?我在 MongoDB Eventstore 配置方面遇到了问题。
垂直
package com.omb..restadapter;
import com.omb..domain.commands.MyAggregate;
import com.omb..infra.repositories.MongoAggregateProjector;
import com.omb..infra.repositories.MongoAggregateRepository;
import com.omb..restadapter.handler.MyCommandHandler;
import com.omb..restadapter.handler.MyQueryHandler;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.auth.PubSecKeyOptions;
import io.vertx.ext.auth.jwt.JWTAuth;
import io.vertx.ext.auth.jwt.JWTAuthOptions;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.handler.JWTAuthHandler;
import io.vertx.ext.web.handler.StaticHandler;
import org.axonframework.config.AggregateConfigurer;
import org.axonframework.config.Configuration;
import org.axonframework.config.Configurer;
import org.axonframework.config.DefaultConfigurer;
import org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.extensions.mongo.eventsourcing.tokenstore.MongoTokenStore;
import java.util.logging.Logger;
public class MyVerticle extends AbstractVerticle {
private MyCommandHandler MyCommandHandler;
private MyQueryHandler MyQueryHandler;
private static final String APP_JSON = "application/json";
private static final Logger log = Logger.getLogger(MyVerticle.class.getName());
public static final String RESOURCE_NAME = "My";
/**
* Convenience method so you can run it in your IDE
*
* @param args
*/
public static void main(String[] args) {
Runner.runExample(MyVerticle.class);
}
/**
* Start the verticle
*/
@Override
public void start(Promise<Void> startFuture) throws Exception {
Future<Void> steps = createInfra().compose(t -> createRouter());
steps.onComplete(ar -> {
if(ar.succeeded()) {
startFuture.complete();
} else {
log.throwing(MyVerticle.class.getName(), "start", ar.cause());
startFuture.fail(ar.cause());
}
});
}
/**
* Stop the Verticle
*/
@Override
public void stop() throws Exception {
super.stop();
}
/**
* Create the server
*
* @return
*/
private Future<Void> createRouter() {
return Future.future(promise -> {
final Router router = Router.router(vertx);
final JWTAuth authProvider = JWTAuth.create(vertx, getJWTAuthOptions());
// allow route for CORS
router.route()
.handler(CorsHandler.create(".*.").allowedMethod(io.vertx.core.http.HttpMethod.GET)
.allowedMethod(io.vertx.core.http.HttpMethod.POST)
.allowedMethod(io.vertx.core.http.HttpMethod.DELETE)
.allowedMethod(io.vertx.core.http.HttpMethod.PUT).allowCredentials(true)
.allowedHeader("Access-Control-Allow-Method").allowedHeader("Authorization")
.allowedHeader("idPartner").allowedHeader("Access-Control-Allow-Origin")
.allowedHeader("Access-Control-Allow-Credentials").allowedHeader("Content-Type"));
router.route().handler(BodyHandler.create()).failureHandler(new FailureHandler());
router.get("/my-reources").produces(APP_JSON).handler(MyQueryHandler::getAllResource);
router.post("/my-resources").consumes(APP_JSON).produces(APP_JSON).handler(MyCommandHandler::createMy);
// Healthcheck
router.get("/health*").handler(HealthCheckHandler.create(vertx).register("health", res -> res.complete(Status.OK())));
// Swagger
router.route("/*").handler(StaticHandler.create());
vertx.createHttpServer().requestHandler(router).listen(8083);
promise.complete();
});
}
private Future<Void> createInfra() {
return Future.future(promise -> {
MongoClient mongoClient = new MongoClient(config().getString(ConfigResource.CONFIG_MONGODB_URL));
MongoDatabase database = mongoClient.getDatabase("resource");
MongoAggregateRepository repository = new MongoMyRepository(database);
MongoAggregateProjector MyProjector = new MongoAggregateProjector(repository);
Configuration configuration = DefaultConfigurer
.defaultConfiguration()
.configureAggregate(MyAggregate.class)
.eventProcessing(conf -> conf.registerTokenStore(config -> MongoTokenStore.builder().build()))
.registerEventHandler(conf -> MyProjector)
.registerQueryHandler(conf -> MyProjector)
.buildConfiguration();
// Write
MyCommandHandler = new MyCommandHandler(configuration.commandGateway());
// Read
MyQueryHandler = new MyQueryHandler(configuration.queryGateway());
promise.complete();
});
}
}
投影仪
package com.omb..infra.repositories;
import com.omb..domain.events.AggregateCreatedEvent;
import com.omb..domain.events.AggregateUpdatedEvent;
import com.omb..domain.queries.FindAggregateQuery;
import com.omb..domain.queries.FindAggregatesQuery;
import com.omb..domain.queries.IAggregateProjector;
import com.omb..domain.queries.AggregateView;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class MongoAggregateProjector implements IAggregateProjector {
private MongoAggregateRepository repository;
public MongoAggregateProjector(MongoAggregateRepository repository) {
this.repository = repository;
}
@Override
@EventHandler
public void on(AggregateCreatedEvent event) {
AggregateDocument AggregateDocument = new AggregateDocument(event.getId(), event.getName());
repository.addAggregate(AggregateDocument);
}
@Override
@EventHandler
public void on(AggregateUpdatedEvent event) {
AggregateDocument AggregateDocument = new AggregateDocument(event.getId(), event.getName());
repository.updateAggregate(AggregateDocument);
}
@Override
@QueryHandler
public Optional<AggregateView> handle(FindAggregateQuery query) {
return repository.getAggregate(query.getAggregateId()).map(AggregateDocument::toView);
}
@Override
@QueryHandler
public List<AggregateView> handle(FindAggregatesQuery query) {
return repository.getAggregates().stream().map(AggregateDocument::toView).collect(Collectors.toList());
}
}
汇总
package com.omb..domain.commands;
import com.omb..domain.events.AggregateCreatedEvent;
import com.omb..domain.events.AggregateUpdatedEvent;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
public class MyAggregate {
@AggregateIdentifier
private String myAggregateId;
private String name;
public Aggregate() {}
@CommandHandler
public Aggregate(CreateAggregateCommand command) {
apply(new AggregateCreatedEvent(command.getId(), command.getName()));
}
@CommandHandler
public void handle(UpdateAggregateCommand command) {
apply(new AggregateUpdatedEvent(command.getId(), command.getName()));
}
@EventSourcingHandler
public void on(AggregateCreatedEvent event) {
AggregateId = event.getId();
name = event.getName();
}
@EventSourcingHandler
public void on(AggregateUpdatedEvent event) {
AggregateId = event.getId();
name = event.getName();
}
}
我发现配置中有 2 个问题:
您只是“构建”了配置,但没有启动它。在
buildConfiguration()
之后,确保对返回的 Configuration 实例调用 'start()'。或者,直接在配置器上调用start()
。它 returns 一个启动的配置实例。这应该可以解决未通过的注册问题。但是很可能会触发下一个问题相关的异常....
您的 MongoTokenStore 配置不完整。 TokenStore 至少需要一个序列化程序和一个
MongoTemplate
实例。后者告诉 Axon 您想要哪些集合中的某些类型的信息。在您的情况下,只有TrackingTokenCollection
是相关的。config -> MongoTokenStore.builder() .mongoTemplate( DefaultMongoTemplate.builder() // optionally choose collection names here .mongoDatabase(mongoClient) .build()) .serializer(Configuration::serializer) .build()
我建议检查构建器中的选项,以确保您不需要配置特定于您的环境的任何其他内容。默认情况下适用于一般情况,但可能对您无效。