如何在 Vert.x 中配置 Axon 4 上下文

How to configure Axon 4 context in Vert.x

我正在使用 Axon 在我的 Vert.X 微服务中实施 CQRS/Event 采购。 在我的 Verticle 的 bootstrap 中,我有一个 createInfra 方法来创建我的 Axon 上下文。 当我尝试从 ny projection 获取资源时,我没有结果并且请求没有结束地执行。当我检查 QueryGateway 时,在 SimpleGatewayBus 中我没有订阅。

是否有人可以帮助我修复我的 Axon 配置?我在 MongoDB Eventstore 配置方面遇到了问题。

垂直

package com.omb..restadapter;

import com.omb..domain.commands.MyAggregate;
import com.omb..infra.repositories.MongoAggregateProjector;
import com.omb..infra.repositories.MongoAggregateRepository;
import com.omb..restadapter.handler.MyCommandHandler;
import com.omb..restadapter.handler.MyQueryHandler;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.auth.PubSecKeyOptions;
import io.vertx.ext.auth.jwt.JWTAuth;
import io.vertx.ext.auth.jwt.JWTAuthOptions;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.handler.JWTAuthHandler;
import io.vertx.ext.web.handler.StaticHandler;
import org.axonframework.config.AggregateConfigurer;
import org.axonframework.config.Configuration;
import org.axonframework.config.Configurer;
import org.axonframework.config.DefaultConfigurer;
import org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.extensions.mongo.eventsourcing.tokenstore.MongoTokenStore;

import java.util.logging.Logger;

public class MyVerticle extends AbstractVerticle {

    private MyCommandHandler MyCommandHandler;
    private MyQueryHandler MyQueryHandler;
    private static final String APP_JSON = "application/json";
    private static final Logger log = Logger.getLogger(MyVerticle.class.getName());

    public static final String RESOURCE_NAME = "My";

    /**
     * Convenience method so you can run it in your IDE
     *
     * @param args
     */
    public static void main(String[] args) {
        Runner.runExample(MyVerticle.class);
    }

    /**
     * Start the verticle
     */
    @Override
    public void start(Promise<Void> startFuture) throws Exception {

        Future<Void> steps = createInfra().compose(t -> createRouter());

        steps.onComplete(ar -> {
            if(ar.succeeded()) {
                startFuture.complete();
            } else {
                log.throwing(MyVerticle.class.getName(), "start", ar.cause());
                startFuture.fail(ar.cause());
            }
        });
    }

    /**
     * Stop the Verticle
     */
    @Override
    public void stop() throws Exception {
        super.stop();
    }

    /**
     * Create the server
     *
     * @return
     */
    private Future<Void> createRouter() {
        return Future.future(promise -> {
            final Router router = Router.router(vertx);

            final JWTAuth authProvider = JWTAuth.create(vertx, getJWTAuthOptions());

            // allow route for CORS
            router.route()
                    .handler(CorsHandler.create(".*.").allowedMethod(io.vertx.core.http.HttpMethod.GET)
                            .allowedMethod(io.vertx.core.http.HttpMethod.POST)
                            .allowedMethod(io.vertx.core.http.HttpMethod.DELETE)
                            .allowedMethod(io.vertx.core.http.HttpMethod.PUT).allowCredentials(true)
                            .allowedHeader("Access-Control-Allow-Method").allowedHeader("Authorization")
                            .allowedHeader("idPartner").allowedHeader("Access-Control-Allow-Origin")
                            .allowedHeader("Access-Control-Allow-Credentials").allowedHeader("Content-Type"));

            router.route().handler(BodyHandler.create()).failureHandler(new FailureHandler());

            router.get("/my-reources").produces(APP_JSON).handler(MyQueryHandler::getAllResource);
            router.post("/my-resources").consumes(APP_JSON).produces(APP_JSON).handler(MyCommandHandler::createMy);
            

            // Healthcheck
            router.get("/health*").handler(HealthCheckHandler.create(vertx).register("health", res -> res.complete(Status.OK())));

            // Swagger
            router.route("/*").handler(StaticHandler.create());

            vertx.createHttpServer().requestHandler(router).listen(8083);
            promise.complete();
        });
    }

    private Future<Void> createInfra() {
        return Future.future(promise -> {

            MongoClient mongoClient = new MongoClient(config().getString(ConfigResource.CONFIG_MONGODB_URL));
            MongoDatabase database = mongoClient.getDatabase("resource");
            MongoAggregateRepository repository = new MongoMyRepository(database);
            MongoAggregateProjector MyProjector = new MongoAggregateProjector(repository);

            Configuration configuration = DefaultConfigurer
                    .defaultConfiguration()
                    .configureAggregate(MyAggregate.class)
                    .eventProcessing(conf -> conf.registerTokenStore(config -> MongoTokenStore.builder().build()))
                    .registerEventHandler(conf -> MyProjector)
                    .registerQueryHandler(conf -> MyProjector)
                    .buildConfiguration();

            // Write
            MyCommandHandler = new MyCommandHandler(configuration.commandGateway());

            // Read
            MyQueryHandler = new MyQueryHandler(configuration.queryGateway());

            promise.complete();
        });
    }
}

投影仪

package com.omb..infra.repositories;

import com.omb..domain.events.AggregateCreatedEvent;
import com.omb..domain.events.AggregateUpdatedEvent;
import com.omb..domain.queries.FindAggregateQuery;
import com.omb..domain.queries.FindAggregatesQuery;
import com.omb..domain.queries.IAggregateProjector;
import com.omb..domain.queries.AggregateView;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class MongoAggregateProjector implements IAggregateProjector {

    private MongoAggregateRepository repository;

    public MongoAggregateProjector(MongoAggregateRepository repository) {
        this.repository = repository;
    }

    @Override
    @EventHandler
    public void on(AggregateCreatedEvent event) {
        AggregateDocument AggregateDocument = new AggregateDocument(event.getId(), event.getName());
        repository.addAggregate(AggregateDocument);
    }

    @Override
    @EventHandler
    public void on(AggregateUpdatedEvent event) {
        AggregateDocument AggregateDocument = new AggregateDocument(event.getId(), event.getName());
        repository.updateAggregate(AggregateDocument);
    }

    @Override
    @QueryHandler
    public Optional<AggregateView> handle(FindAggregateQuery query) {
        return repository.getAggregate(query.getAggregateId()).map(AggregateDocument::toView);
    }

    @Override
    @QueryHandler
    public List<AggregateView> handle(FindAggregatesQuery query) {
        return repository.getAggregates().stream().map(AggregateDocument::toView).collect(Collectors.toList());
    }
}

汇总

package com.omb..domain.commands;

import com.omb..domain.events.AggregateCreatedEvent;
import com.omb..domain.events.AggregateUpdatedEvent;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;

import static org.axonframework.modelling.command.AggregateLifecycle.apply;

public class MyAggregate {

    @AggregateIdentifier
    private String myAggregateId;
    private String name;

    public Aggregate() {}

    @CommandHandler
    public Aggregate(CreateAggregateCommand command) {
        apply(new AggregateCreatedEvent(command.getId(), command.getName()));
    }

    @CommandHandler
    public void handle(UpdateAggregateCommand command) {
        apply(new AggregateUpdatedEvent(command.getId(), command.getName()));
    }

    @EventSourcingHandler
    public void on(AggregateCreatedEvent event) {
        AggregateId = event.getId();
        name = event.getName();
    }

    @EventSourcingHandler
    public void on(AggregateUpdatedEvent event) {
        AggregateId = event.getId();
        name = event.getName();
    }
}

我发现配置中有 2 个问题:

  1. 您只是“构建”了配置,但没有启动它。在 buildConfiguration() 之后,确保对返回的 Configuration 实例调用 'start()'。或者,直接在配置器上调用 start()。它 returns 一个启动的配置实例。

    这应该可以解决未通过的注册问题。但是很可能会触发下一个问题相关的异常....

  2. 您的 MongoTokenStore 配置不完整。 TokenStore 至少需要一个序列化程序和一个 MongoTemplate 实例。后者告诉 Axon 您想要哪些集合中的某些类型的信息。在您的情况下,只有 TrackingTokenCollection 是相关的。

    config -> MongoTokenStore.builder()
                             .mongoTemplate(
                                 DefaultMongoTemplate.builder()
                                                     // optionally choose collection names here
                                                     .mongoDatabase(mongoClient)
                                                     .build())
                             .serializer(Configuration::serializer)
                             .build()
    

我建议检查构建器中的选项,以确保您不需要配置特定于您的环境的任何其他内容。默认情况下适用于一般情况,但可能对您无效。