如何在 vertx Hazelcast 集群中设置分布式事件总线?

How to setup a distributed eventbus in a vertx Hazelcast Cluster?

这里是发件人verticle 我已启用多播并将 public 主机设置为我的机器 ip 地址

VertxOptions options = new VertxOptions()
                .setClusterManager(ClusterManagerConfig.getClusterManager());
       EventBusOptions eventBusOptions = new EventBusOptions()
               .setClustered(true)
               .setClusterPublicHost("10.10.1.160");

        options.setEventBusOptions(eventBusOptions);



        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                Vertx vertx = res.result();
                vertx.deployVerticle(new requestHandler());
                vertx.deployVerticle(new requestSender());
             
                EventBus eventBus = vertx.eventBus();
                eventBus.send("some.address","hello",reply -> {
                    System.out.println(reply.toString());
                   
                });


            } else {
                LOGGER.info("Failed: " + res.cause());
            }
        });
    }

这里是接收端


VertxOptions options = new VertxOptions().setClusterManager(mgr);
        options.setEventBusOptions(new EventBusOptions()
                .setClustered(true)
                .setClusterPublicHost("10.10.1.174") );



        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                Vertx vertx1 = res.result();

                System.out.println("Success");
                EventBus eb = vertx1.eventBus();
                System.out.println("ready");
                eb.consumer("some.address", message -> {
                  
                    message.reply("hello hello");
                });
            } else {
                System.out.println("Failed");
            }
        });

当我 运行 两个主 Verticle 时我得到这个结果,因此 Hazelcast 检测到 Verticle 并建立连接

INFO: [10.10.1.160]:33001 [dev] [3.10.5] Established socket connection between /10.10.1.160:33001 and /10.10.1.174:35725
Jan 11, 2021 11:45:10 AM com.hazelcast.internal.cluster.ClusterService
INFO: [10.10.1.160]:33001 [dev] [3.10.5] 

Members {size:2, ver:2} [
    Member [10.10.1.160]:33001 - 51b8c249-6b3c-4ca8-a238-c651845629d8 this
    Member [10.10.1.174]:33001 - 1cba1680-025e-469f-bad6-884111313672
]

Jan 11, 2021 11:45:10 AM com.hazelcast.internal.partition.impl.MigrationManager
INFO: [10.10.1.160]:33001 [dev] [3.10.5] Re-partitioning cluster data... Migration queue size: 271
Jan 11, 2021 11:45:11 AM com.hazelcast.nio.tcp.TcpIpAcceptor

但是当事件总线尝试向给定地址发送消息时我遇到了这个错误这是事件总线配置的问题吗?

Jan 11, 2021 11:59:57 AM io.vertx.core.eventbus.impl.clustered.ConnectionHolder
WARNING: Connecting to server 10.10.1.174:39561 failed
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /10.10.1.174:39561
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:665)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:905)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused
    ... 11 more

在Vert.x 3、集群主机和集群public主机默认为localhost.

如果您只更改 VertxOptions 中的集群 public 主机,Vert.x 会将 EventBus 传输服务器绑定到 localhost,同时告诉其他节点连接到 public主机.

在某些云提供商上运行 Vert.x时需要这种配置,但大多数情况下您只需要设置集群主机(然后public主机将默认为其值):

EventBusOptions eventBusOptions = new EventBusOptions()
  .setClustered(true)
  .setHost("10.10.1.160");