Netty如何测试使用客户端远程地址的处理程序

Netty how to test Handler which uses Remote Address of a client

我有一个带有 Spring Boot 2.3.1 的 Netty TCP 服务器和以下处理程序:

@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {

    private final CarParkPermissionService permissionService;
    private final Gson gson = new Gson();

    private String remoteAddress;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.fireChannelActive();

        remoteAddress = ctx.channel().remoteAddress().toString();
        if (log.isDebugEnabled()) {
            log.debug(remoteAddress);
        }
        ctx.writeAndFlush("Your remote address is " + remoteAddress + ".\r\n");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.info("CLIENT_IP: {}", remoteAddress);

        String stringMsg = (String) msg;
        log.info("CLIENT_REQUEST: {}", stringMsg);

        String lowerCaseMsg = stringMsg.toLowerCase();

        if (RequestType.HEARTBEAT.containsName(lowerCaseMsg)) {
            HeartbeatRequest heartbeatRequest = gson.fromJson(stringMsg, HeartbeatRequest.class);
            log.debug("heartbeat request: {}", heartbeatRequest);

            HeartbeatResponse response = HeartbeatResponse.builder()
                .responseCode("ok")
                .build();
            ctx.writeAndFlush(response + "\n\r");
        }
    }

请求 DTO:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HeartbeatRequest {
    private String messageID;
}

响应 DTO:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HeartbeatResponse {
    private String responseCode;
}

逻辑很简单。只有我需要知道客户端的IP地址。

我也需要测试一下

我一直在寻找许多资源来测试 Netty 的处理程序,例如

然而,它对我不起作用。

对于 EmbeddedChannel,我有以下错误 - Your remote address is embedded.

代码如下:

@ActiveProfiles("test")
@RunWith(MockitoJUnitRunner.class)
public class ProcessingHandlerTest_Embedded {

    @Mock
    private PermissionService permissionService;
    private EmbeddedChannel embeddedChannel;
    private final Gson gson = new Gson();

    private ProcessingHandler processingHandler;


    @Before
    public void setUp() {
        processingHandler = new ProcessingHandler(permissionService);
        embeddedChannel = new EmbeddedChannel(processingHandler);
    }

    @Test
    public void testHeartbeatMessage() {
        // given
        HeartbeatRequest heartbeatMessage = HeartbeatRequest.builder()
                .messageID("heartbeat")
                .build();

        HeartbeatResponse response = HeartbeatResponse.builder()
                .responseCode("ok")
                .build();
        String request = gson.toJson(heartbeatMessage).concat("\r\n");
        String expected = gson.toJson(response).concat("\r\n");

        // when
        embeddedChannel.writeInbound(request);

        // then
        Queue<Object> outboundMessages = embeddedChannel.outboundMessages();
        assertEquals(expected, outboundMessages.poll());
    }
}

输出:

22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_IP: embedded
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_REQUEST: {"messageID":"heartbeat"}

22:21:29.067 [main] DEBUG handler.ProcessingHandler - heartbeat request: HeartbeatRequest(messageID=heartbeat)

org.junit.ComparisonFailure: 
<Click to see difference>

但是,我不知道如何对这种情况进行精确测试。

这是配置的一个片段:

@Bean
@SneakyThrows
public InetSocketAddress tcpSocketAddress() {
    // for now, hostname is: localhost/127.0.0.1:9090
    return new InetSocketAddress("localhost", nettyProperties.getTcpPort());

    // for real client devices: A05264/172.28.1.162:9090
    // return new InetSocketAddress(InetAddress.getLocalHost(), nettyProperties.getTcpPort());
}

@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {

    private final StringEncoder stringEncoder = new StringEncoder();
    private final StringDecoder stringDecoder = new StringDecoder();

    private final QrReaderProcessingHandler readerServerHandler;
    private final NettyProperties nettyProperties;

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // Add the text line codec combination first
        pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Delimiters.lineDelimiter()));

        pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
        pipeline.addLast(stringDecoder);
        pipeline.addLast(stringEncoder);
        pipeline.addLast(readerServerHandler);
    }
}

如何用客户端的IP地址测试处理程序?

两件事可以提供帮助:

  1. 如果您的处理程序不可共享,请不要使用 @ChannelHandler.Sharable 进行注释。这可能会产生误导。从处理程序中删除不必要的状态。在您的情况下,您应该删除 remoteAddress 成员变量并确保 GsonCarParkPermissionService 可以重复使用并且是 thread-safe.

  2. "Your remote address is embedded" 不是错误。它实际上是您的处理程序写入出站通道的消息(参见您的 channelActive() 方法)

所以看起来它可以工作。

编辑

在您的评论之后,我对第二点进行了一些澄清。我的意思是:

  • 您使用 EmbeddedChannel 的代码几乎是正确的。只是对预期结果有误解(断言)。

要使单元测试成功,您只需:

  • channelActive()中评论这一行:ctx.writeAndFlush("Your remote ...")
  • 或在testHeartbeatMessage()
  • 中轮询来自Queue<Object> outboundMessages的第二条消息

确实,当你这样做时:

// when
embeddedChannel.writeInbound(request);

(1) 你实际上打开了一次频道,它触发了一个 channelActive() 事件。您没有登录,但我们看到变量 remoteAddress 之后不为空,这意味着它是在 channelActive() 方法中分配的。

(2)channelActive() 方法的末尾,您最终已经通过在通道管道上写入来发回一条消息,如这一行所示:

ctx.writeAndFlush("Your remote address is " + remoteAddress + ".\r\n");
// In fact, this is the message you see in your failed assertion.

(3) 然后收到embeddedChannel.writeInbound(request)写的消息,可以读取,触发了channelRead()事件。这一次,我们在您的日志输出中看到:

22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_IP: embedded
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_REQUEST: {"messageID":"heartbeat"}
22:21:29.067 [main] DEBUG handler.ProcessingHandler - heartbeat request: HeartbeatRequest(messageID=heartbeat)

(4)channelRead(ChannelHandlerContext ctx, Object msg) 结束时,您将发送 second 消息(预期的消息) :

HeartbeatResponse response = HeartbeatResponse.builder()
     .responseCode("ok")
     .build();
ctx.writeAndFlush(response + "\n\r");

因此,使用以下单元测试代码...

Queue<Object> outboundMessages = embeddedChannel.outboundMessages();
assertEquals(expected, outboundMessages.poll());

...您应该能够 poll() 两个 消息:

  • "Your remote address is embedded"
  • "{ResponseCode":"ok"}

你觉得有意义吗?