在 Vert.x 应用程序中使用 Project Reactor
Using Project Reactor in a Vert.x application
我在 Vert.x 应用程序中使用一个库 returns Project Reactor type Mono。
我有一个 Verticle 接收此反应类型,旨在通过事件总线将内容发送到另一个 Verticle:
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
@Override
public void start() throws Exception
{
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(Schedulers.fromExecutor(vertx.nettyEventLoopGroup())) // is this needed?
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body());
}, error -> message.fail(1, error.getMessage()));
}
}
这是正确的做法吗?在将消息发送到事件总线之前,我应该切换到 Vert.x 事件循环线程池吗?一起使用这些库时有什么我应该注意的吗?
代码对我来说看起来不错,除了你不应该使用 Netty 事件循环组作为执行器,而应该使用 Verticle 上下文:
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
private Scheduler scheduler;
@Override
public void start() throws Exception
{
scheduler = Schedulers.fromExecutor(command -> context.runOnContext(v -> command.run()));
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(scheduler)
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body());
}, error -> message.fail(1, error.getMessage()));
}
}
使用这样的调度程序,您可以确保 Verticle 状态不会被分配的事件循环以外的线程修改。
我在 Vert.x 应用程序中使用一个库 returns Project Reactor type Mono。
我有一个 Verticle 接收此反应类型,旨在通过事件总线将内容发送到另一个 Verticle:
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
@Override
public void start() throws Exception
{
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(Schedulers.fromExecutor(vertx.nettyEventLoopGroup())) // is this needed?
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body());
}, error -> message.fail(1, error.getMessage()));
}
}
这是正确的做法吗?在将消息发送到事件总线之前,我应该切换到 Vert.x 事件循环线程池吗?一起使用这些库时有什么我应该注意的吗?
代码对我来说看起来不错,除了你不应该使用 Netty 事件循环组作为执行器,而应该使用 Verticle 上下文:
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
private Scheduler scheduler;
@Override
public void start() throws Exception
{
scheduler = Schedulers.fromExecutor(command -> context.runOnContext(v -> command.run()));
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(scheduler)
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body());
}, error -> message.fail(1, error.getMessage()));
}
}
使用这样的调度程序,您可以确保 Verticle 状态不会被分配的事件循环以外的线程修改。