如何在 Quarkus 中启动多个消息消费者?

How to start multiple message consumers in Quarkus?

我正在尝试从 Vert.x 迁移到 Quarkus 并在 Vert.x 中,当我编写 Kafka/AMQP 等消息消费者时。我必须扩展垂直数量以最大化性能跨多个内核,即垂直缩放——这在 Quarkus 中可能吗?我看到一个类似的问题 here 但没有人回答。

例如,对于 Kafka,我可能会在垂直内创建一个消费者,然后 scale 在进行性能测试后,该垂直说 10 次(即指定部署中的实例数为 10)确定这是最佳数字。我的理解是,默认情况下,1 个垂直 = 1 个事件循环并且不会跨多个核心扩展。

我知道在 Quarkus 中使用 Vert.x 垂直是 possible,但是否有另一种方法来扩展跨多个核心的 Kafka 消费者数量?

我看到这种类型的可扩展性可以为 Quarkus 之类的东西配置 HTTP 但我找不到任何关于消息消费者的信息。

这里是 Vert.x Verticle 方法,总体而言我非常满意,但我希望有更好的文档来说明如何做到这一点。

更新 - 字段注入不适用于此示例,但构造函数注入有效。

假设我想注入这个

@ApplicationScoped
public class CoffeeRepositoryService {

    public CoffeeRepositoryService() {
        System.out.println("Injection succeeded!");
    }
}

这是我的 Verticle

package org.acme;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQOptions;

public class RQVerticle extends AbstractVerticle {

  private final Logger LOGGER = LoggerFactory.getLogger(org.acme.RQVerticle.class);

  //This doesn't work - returns null
  @Inject
  CoffeeRepositoryService coffeeRepositoryService;

  RQVerticle() {} // dummy constructor needed
   
  @Inject // constructor injection - this does work
  RQVerticle(CoffeeRepositoryService coffeeRepositoryService) {
    //Here coffeeRepositoryService is injected properly
  }

  @Override
  public Uni<Void> asyncStart() {
    LOGGER.info(
        "Creating RabbitMQ Connection after Quarkus successful initialization");

    RabbitMQOptions config = new RabbitMQOptions();
    config.setUri("amqp://localhost:5672");
    RabbitMQClient client = RabbitMQClient.create(vertx, config);
    
    Uni<Void> clientResp = client.start();
    clientResp.subscribe()
        .with(asyncResult -> {
          LOGGER.info("RabbitMQ successfully connected!");
              
        });

      return clientResp;
  }
}

主要Class - 注入不是这样工作的

package org.acme;

import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.QuarkusApplication;
import io.quarkus.runtime.annotations.QuarkusMain;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

@QuarkusMain
public class Main {

  public static void main(String... args) {
    Quarkus.run(MyApp.class, args);
  }

  public static class MyApp implements QuarkusApplication {

    @Override
    public int run(String... args) throws Exception {
      var vertx = Vertx.vertx();
      System.out.println("Deployment Starting");

      DeploymentOptions options = new DeploymentOptions()
          .setInstances(2);
      vertx.deployVerticleAndAwait(RQVerticle::new, options);

      System.out.println("Deployment completed");

      Quarkus.waitForExit();
      return 0;
    }
  }
}

主要 Class 具有工作注入但不能部署多个实例

package org.acme;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import org.jboss.logging.Logger;

@ApplicationScoped
public class MainVerticles {

  private static final Logger LOGGER = Logger.getLogger(MainVerticles.class);

  public void init(@Observes StartupEvent e, Vertx vertx, RQVerticle verticle) {
    public void init(@Observes StartupEvent e, Vertx vertx, RQVerticle verticle) {
    DeploymentOptions options = new DeploymentOptions()
    .setInstances(2);
    vertx.deployVerticle(verticle,options).await().indefinitely();
  }
}

标准输出 - 第一个主要 class 看起来不错

2021-09-15 15:48:12,052 INFO [org.acm.RQVerticle] (vert.x-eventloop-thread-2) Creating RabbitMQ Connection after Quarkus successful initialization

2021-09-15 15:48:12,053 INFO [org.acm.RQVerticle] (vert.x-eventloop-thread-3) Creating RabbitMQ Connection after Quarkus successful initialization

标准输出 - 第二主线 class

2021-09-22 15:48:11,986 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile dev): java.lang.IllegalArgumentException: Can't specify > 1 instances for already created verticle