ActiveMQ 故障转移超时随机工作

ActiveMQ failover timeout works randomly

我遇到了超时故障转移选项的问题。 基本上我想要一个不断向代理发送消息的客户端,如果代理关闭,它将尝试重新连接直到代理再次启动。同时发送会超时,不会一直pending。

但是,经过一些测试,超时选项似乎无法正常工作。 有时,它会超时,但有时,它只是挂在那里。 我知道另一个选项是 maxReconnectAttempt。但问题是我希望它永远尝试重新连接。

下面是我正在使用的url:

failover:(tcp://10.5.0.198:61616)?timeout=1000

我有一个双服务器系统。一位作为经纪人,一位作为客户。当我将代理切换到 start/stop 时,我收到以下日志消息:

We sent a Message!
2016-02-24 20:29:06,967 [198:61616@17075] - WARN  FailoverTransport - Transport (tcp://10.5.0.198:61616) failed, reason:  java.io.EOFException, attempting to automatically reconnect
Failover timeout of 1000 ms reached.
Failover timeout of 1000 ms reached.
Failover timeout of 1000 ms reached.
Failover timeout of 1000 ms reached.
Failover timeout of 1000 ms reached.
Failover timeout of 1000 ms reached.
We sent a Message!
We sent a Message!
We sent a Message!
We sent a Message!
We sent a Message!
2016-02-24 20:29:48,688 [198:61616@17099] - WARN  FailoverTransport - Transport (tcp://10.5.0.198:61616) failed, reason:  java.io.EOFException, attempting to automatically reconnect
We sent a Message!
We sent a Message!
We sent a Message!
We sent a Message!
We sent a Message!
2016-02-24 21:57:50,777 [198:61616@18542] - WARN  FailoverTransport - Transport (tcp://10.5.0.198:61616) failed, reason:  java.io.EOFException, attempting to automatically reconnect
Failover timeout of 1000 ms reached.
Failover timeout of 1000 ms reached.
Failover timeout of 1000 ms reached.

可以看到,第二次当broker宕机的时候,并没有超时超过一个小时。

下面是我使用的代码:

private final String connectionUri = "failover:(tcp://10.5.0.198:61616)?timeout=1000";
private ActiveMQConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Destination destination;

public void before() throws Exception {
    connectionFactory = new ActiveMQConnectionFactory(connectionUri);
    connection = connectionFactory.createConnection();
    connection.start();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    destination = session.createQueue("MyQueue");
}

public void after() throws Exception {
    if (connection != null) {
        connection.close();
    }
}

public void run() throws Exception {

    MessageProducer producer = session.createProducer(destination);
    try {
        TextMessage message = session.createTextMessage();
        message.setText("We sent a Message!");
        producer.send(message);
    } finally {
        producer.close();
    }

    MessageConsumer consumer = session.createConsumer(destination);
    try {
        TextMessage message = (TextMessage) consumer.receive();
        System.out.println(message.getText());
    } finally {
        consumer.close();
    }
}
public static void main(String[] args) {
    SimpleJMS example = new SimpleJMS();
    System.out.print("\n\n\n");
    System.out.println("Starting SimpleJMS example now...");
    try {
        example.before();
        for(int i =0;i<1000;i++){
            Thread.sleep(1000);
            try{
                example.run();
            } catch (Exception e){
                System.out.println(e.getMessage());
            }
        }
        example.after();
    } catch (Exception e) {
        System.out.println("Caught an exception during the example: " + e.getMessage());
    }
    System.out.println("Finished running the SimpleJMS example.");
    System.out.print("\n\n\n");
}

我想不通。经过一番研究,似乎是AMQ的问题。

所以为了不阻塞整个事情,我切换到提供本地缓冲区的网络代理,这样当它进行重试时,发送操作将继续。

这是目前我能找到的最佳解决方案。

我遇到了同样的挑战,似乎添加 maxReconnectAttempts=2&maxReconnectDelay=5000 有帮助