如何在 Quarkus 中启动多个消息消费者?
How to start multiple message consumers in Quarkus?
我正在尝试从 Vert.x 迁移到 Quarkus 并在 Vert.x 中,当我编写 Kafka/AMQP 等消息消费者时。我必须扩展垂直数量以最大化性能跨多个内核,即垂直缩放——这在 Quarkus 中可能吗?我看到一个类似的问题 here 但没有人回答。
例如,对于 Kafka,我可能会在垂直内创建一个消费者,然后 scale 在进行性能测试后,该垂直说 10 次(即指定部署中的实例数为 10)确定这是最佳数字。我的理解是,默认情况下,1 个垂直 = 1 个事件循环并且不会跨多个核心扩展。
我知道在 Quarkus 中使用 Vert.x 垂直是 possible,但是否有另一种方法来扩展跨多个核心的 Kafka 消费者数量?
我看到这种类型的可扩展性可以为 Quarkus 之类的东西配置 HTTP 但我找不到任何关于消息消费者的信息。
这里是 Vert.x Verticle 方法,总体而言我非常满意,但我希望有更好的文档来说明如何做到这一点。
更新 - 字段注入不适用于此示例,但构造函数注入有效。
假设我想注入这个
@ApplicationScoped
public class CoffeeRepositoryService {
public CoffeeRepositoryService() {
System.out.println("Injection succeeded!");
}
}
这是我的 Verticle
package org.acme;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQOptions;
public class RQVerticle extends AbstractVerticle {
private final Logger LOGGER = LoggerFactory.getLogger(org.acme.RQVerticle.class);
//This doesn't work - returns null
@Inject
CoffeeRepositoryService coffeeRepositoryService;
RQVerticle() {} // dummy constructor needed
@Inject // constructor injection - this does work
RQVerticle(CoffeeRepositoryService coffeeRepositoryService) {
//Here coffeeRepositoryService is injected properly
}
@Override
public Uni<Void> asyncStart() {
LOGGER.info(
"Creating RabbitMQ Connection after Quarkus successful initialization");
RabbitMQOptions config = new RabbitMQOptions();
config.setUri("amqp://localhost:5672");
RabbitMQClient client = RabbitMQClient.create(vertx, config);
Uni<Void> clientResp = client.start();
clientResp.subscribe()
.with(asyncResult -> {
LOGGER.info("RabbitMQ successfully connected!");
});
return clientResp;
}
}
主要Class - 注入不是这样工作的
package org.acme;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.QuarkusApplication;
import io.quarkus.runtime.annotations.QuarkusMain;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
@QuarkusMain
public class Main {
public static void main(String... args) {
Quarkus.run(MyApp.class, args);
}
public static class MyApp implements QuarkusApplication {
@Override
public int run(String... args) throws Exception {
var vertx = Vertx.vertx();
System.out.println("Deployment Starting");
DeploymentOptions options = new DeploymentOptions()
.setInstances(2);
vertx.deployVerticleAndAwait(RQVerticle::new, options);
System.out.println("Deployment completed");
Quarkus.waitForExit();
return 0;
}
}
}
主要 Class 具有工作注入但不能部署多个实例
package org.acme;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import org.jboss.logging.Logger;
@ApplicationScoped
public class MainVerticles {
private static final Logger LOGGER = Logger.getLogger(MainVerticles.class);
public void init(@Observes StartupEvent e, Vertx vertx, RQVerticle verticle) {
public void init(@Observes StartupEvent e, Vertx vertx, RQVerticle verticle) {
DeploymentOptions options = new DeploymentOptions()
.setInstances(2);
vertx.deployVerticle(verticle,options).await().indefinitely();
}
}
标准输出 - 第一个主要 class 看起来不错
2021-09-15 15:48:12,052 INFO [org.acm.RQVerticle] (vert.x-eventloop-thread-2) Creating RabbitMQ Connection after Quarkus successful initialization
2021-09-15 15:48:12,053 INFO [org.acm.RQVerticle] (vert.x-eventloop-thread-3) Creating RabbitMQ Connection after Quarkus successful initialization
标准输出 - 第二主线 class
2021-09-22 15:48:11,986 ERROR [io.qua.run.Application] (Quarkus Main
Thread) Failed to start application (with profile dev):
java.lang.IllegalArgumentException: Can't specify > 1 instances for
already created verticle
我正在尝试从 Vert.x 迁移到 Quarkus 并在 Vert.x 中,当我编写 Kafka/AMQP 等消息消费者时。我必须扩展垂直数量以最大化性能跨多个内核,即垂直缩放——这在 Quarkus 中可能吗?我看到一个类似的问题 here 但没有人回答。
例如,对于 Kafka,我可能会在垂直内创建一个消费者,然后 scale 在进行性能测试后,该垂直说 10 次(即指定部署中的实例数为 10)确定这是最佳数字。我的理解是,默认情况下,1 个垂直 = 1 个事件循环并且不会跨多个核心扩展。
我知道在 Quarkus 中使用 Vert.x 垂直是 possible,但是否有另一种方法来扩展跨多个核心的 Kafka 消费者数量?
我看到这种类型的可扩展性可以为 Quarkus 之类的东西配置 HTTP 但我找不到任何关于消息消费者的信息。
这里是 Vert.x Verticle 方法,总体而言我非常满意,但我希望有更好的文档来说明如何做到这一点。
更新 - 字段注入不适用于此示例,但构造函数注入有效。
假设我想注入这个
@ApplicationScoped
public class CoffeeRepositoryService {
public CoffeeRepositoryService() {
System.out.println("Injection succeeded!");
}
}
这是我的 Verticle
package org.acme;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQOptions;
public class RQVerticle extends AbstractVerticle {
private final Logger LOGGER = LoggerFactory.getLogger(org.acme.RQVerticle.class);
//This doesn't work - returns null
@Inject
CoffeeRepositoryService coffeeRepositoryService;
RQVerticle() {} // dummy constructor needed
@Inject // constructor injection - this does work
RQVerticle(CoffeeRepositoryService coffeeRepositoryService) {
//Here coffeeRepositoryService is injected properly
}
@Override
public Uni<Void> asyncStart() {
LOGGER.info(
"Creating RabbitMQ Connection after Quarkus successful initialization");
RabbitMQOptions config = new RabbitMQOptions();
config.setUri("amqp://localhost:5672");
RabbitMQClient client = RabbitMQClient.create(vertx, config);
Uni<Void> clientResp = client.start();
clientResp.subscribe()
.with(asyncResult -> {
LOGGER.info("RabbitMQ successfully connected!");
});
return clientResp;
}
}
主要Class - 注入不是这样工作的
package org.acme;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.QuarkusApplication;
import io.quarkus.runtime.annotations.QuarkusMain;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
@QuarkusMain
public class Main {
public static void main(String... args) {
Quarkus.run(MyApp.class, args);
}
public static class MyApp implements QuarkusApplication {
@Override
public int run(String... args) throws Exception {
var vertx = Vertx.vertx();
System.out.println("Deployment Starting");
DeploymentOptions options = new DeploymentOptions()
.setInstances(2);
vertx.deployVerticleAndAwait(RQVerticle::new, options);
System.out.println("Deployment completed");
Quarkus.waitForExit();
return 0;
}
}
}
主要 Class 具有工作注入但不能部署多个实例
package org.acme;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import org.jboss.logging.Logger;
@ApplicationScoped
public class MainVerticles {
private static final Logger LOGGER = Logger.getLogger(MainVerticles.class);
public void init(@Observes StartupEvent e, Vertx vertx, RQVerticle verticle) {
public void init(@Observes StartupEvent e, Vertx vertx, RQVerticle verticle) {
DeploymentOptions options = new DeploymentOptions()
.setInstances(2);
vertx.deployVerticle(verticle,options).await().indefinitely();
}
}
标准输出 - 第一个主要 class 看起来不错
2021-09-15 15:48:12,052 INFO [org.acm.RQVerticle] (vert.x-eventloop-thread-2) Creating RabbitMQ Connection after Quarkus successful initialization
2021-09-15 15:48:12,053 INFO [org.acm.RQVerticle] (vert.x-eventloop-thread-3) Creating RabbitMQ Connection after Quarkus successful initialization
标准输出 - 第二主线 class
2021-09-22 15:48:11,986 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile dev): java.lang.IllegalArgumentException: Can't specify > 1 instances for already created verticle