IntegrationFlow 异常处理

IntegrationFlow Exception Handling

我正在尝试为消息创建一个流程,如下所示:

TCPinboundAdapter ----> 消息代理(ActiveMQ)

流量:

这个流是通过以下方式创建的

  1. 消息通过 TCP 连接接收到 TCP 适配器,它可以是客户端或服务器。
  2. 接收到 TCP 适配器的消息被发送到 JMS 适配器(ActiveMQ Broker)。

代码如下:

@EventListener
public void handleTcpConnectionClientEvent(TcpConnectionFailedEvent event){

     TcpNioClientConnectionFactory tcp = (TcpNioClientConnectionFactory)event.getSource();
     System.out.println(tcp); 
     System.out.println("connection exception client :::"+event.getSource());

     this.status = event.toString();

 }
 @EventListener
 public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){
     System.out.println("connection exception server :::");

     this.status = event.toString();

 }

 // this method is invoked when the connection with the sever got disconnected
 @EventListener
 public void handleTcpConnectionServerEvent(TcpConnectionExceptionEvent event){
     System.out.println("connection exception serversssss :::"+event.getConnectionFactoryName());
     this.status = event.toString();

 }

 //when the connection got established (not for first time)
 @EventListener
 public void handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event){
     System.out.println("connection opened :::"+event.getConnectionFactoryName());
    // status = event.toString();

 }

// create a server connection and flow to JMS  
private void createServerConnection(HostConnection hostConnection)  throws Throwable{
    this.status = "success";

    // IntegrationFlow flow;


IntegrationFlowRegistration theFlow;
     IntegrationFlow flow = 
IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
             .serializer(customSerializer)
             .deserializer(customSerializer)
             .id(hostConnection.getConnectionNumber()).soTimeout(10000)))
             .enrichHeaders(f->f.header("abc","abc")))
             .channel(directChannel())
             .handle(Jms.outboundAdapter(ConnectionFactory())
             .destination("jmsInbound"))
             .get();

           theFlow = this.flowContext.registration(flow).id("test.flow").register();


           if(this.status.equals("success"))
           createInboundFlow(hostConnection);

          // startConnection(hostConnection.getConnectionNumber());

}

问题:

此流创建成功,并在没有异常的情况下注册到应用程序上下文。 但是如果出现异常即 (BindException)

  1. 当为特定端口创建服务器并且该端口已被使用时 然后它引发 BindException 然后流也被注册 所以,我们希望当下面任何一个流程组件出现异常时,流程不应该被注册。

    IntegrationFlowRegistration theFlow;
          IntegrationFlow flow = 
               IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
              .serializer(customSerializer)
              .deserializer(customSerializer)
              .id("server").soTimeout(10000)))
              .enrichHeaders(f->f.header("abc","abc")))
              .channel(directChannel())
              .handle(Jms.outboundAdapter(ConnectionFactory())
              .destination("jmsInbound"))
              .get();
    
          theFlow =this.flowContext.registration(flow).id("test.flow").register();
    

有各种侦听器用于检查 TCP 连接中的异常 try{}catch() 块不引发任何异常。

请提供一种合适的方法来处理当前适配器的异常我正在为各种事件使用侦听器以了解 tcp 适配器有问题。

应用Artem Bilan先生提供的这种方法后

@EventListener public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){ System.out.println("connection exception server :::"+event); this.status = event.getCause().getMessage(); AbstractConnectionFactory server = (AbstractConnectionFactory)event.getSource(); System.out.println(server.getComponentName()); this.flowContext.remove(server.getComponentName()+"out.flow"); }

我可以使用 FlowId 删除流,但我无法捕获异常 下面的异常正在控制台上打印并且无法处理即使我已将方法更改为

private void createServerConnection(HostConnection hostConnection) throws Throwable{}

并在调用函数时使用 try{}catch(Throwable t){} 处理这些异常

Exception in thread "pool-4-thread-1" java.lang.NullPointerException

在下面提供的日志中以更详细的形式描述了异常:

    2018-05-17 21:01:40.850  INFO 18332 --- [nio-8080-exec-4] 
    .s.i.i.t.c.TcpNetServerConnectionFactory : started Co123, port=1234
2018-05-17 21:01:40.850  INFO 18332 --- [nio-8080-exec-4] o.s.i.ip.tcp.TcpReceivingChannelAdapter  : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851 ERROR 18332 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Error on ServerSocket; port = 1234

java.net.BindException: Address already in use: JVM_Bind
    at java.net.DualStackPlainSocketImpl.bind0(Native Method) ~[na:1.8.0_111]
    at java.net.DualStackPlainSocketImpl.socketBind(Unknown Source) ~[na:1.8.0_111]
    at java.net.AbstractPlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.PlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
    at javax.net.DefaultServerSocketFactory.createServerSocket(Unknown Source) ~[na:1.8.0_111]
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.createServerSocket(TcpNetServerConnectionFactory.java:211) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:106) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_111]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]

connection exception server :::TcpConnectionServerExceptionEvent [source=Co123, port=1234, cause=java.net.BindException: Address already in use: JVM_Bind]
Co123
2018-05-17 21:01:40.851  INFO 18332 --- [pool-5-thread-1] o.s.i.ip.tcp.TcpReceivingChannelAdapter  : stopped org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {transformer} as a subscriber to the 'Co123out.flow.channel#0' channel
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.Co123out.flow.channel#0' has 0 subscriber(s).
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#11
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {jms:outbound-channel-adapter} as a subscriber to the 'Co123out.flow.channel#1' channel
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.Co123out.flow.channel#1' has 0 subscriber(s).
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#12
Exception in thread "pool-4-thread-1" java.lang.NullPointerException
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Exception in thread "pool-5-thread-1" java.lang.NullPointerException
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)`

您通过以下方式注册 IntegrationFlow

this.flowContext.registration(flow).id("test.flow").register();

相同的 his.flowContext bean 和流的 id 可用于从任何其他地方销毁流,例如一个事件监听器,当你捕捉到提到的 BindException:

    /**
 * Destroy an {@link IntegrationFlow} bean (as well as all its dependant beans)
 * for provided {@code flowId} and clean up all the local cache for it.
 * @param flowId the bean name to destroy from
 */
void remove(String flowId);