Netty如何测试使用客户端远程地址的处理程序
Netty how to test Handler which uses Remote Address of a client
我有一个带有 Spring Boot 2.3.1
的 Netty TCP 服务器和以下处理程序:
@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
private final CarParkPermissionService permissionService;
private final Gson gson = new Gson();
private String remoteAddress;
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
remoteAddress = ctx.channel().remoteAddress().toString();
if (log.isDebugEnabled()) {
log.debug(remoteAddress);
}
ctx.writeAndFlush("Your remote address is " + remoteAddress + ".\r\n");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("CLIENT_IP: {}", remoteAddress);
String stringMsg = (String) msg;
log.info("CLIENT_REQUEST: {}", stringMsg);
String lowerCaseMsg = stringMsg.toLowerCase();
if (RequestType.HEARTBEAT.containsName(lowerCaseMsg)) {
HeartbeatRequest heartbeatRequest = gson.fromJson(stringMsg, HeartbeatRequest.class);
log.debug("heartbeat request: {}", heartbeatRequest);
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
ctx.writeAndFlush(response + "\n\r");
}
}
请求 DTO:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HeartbeatRequest {
private String messageID;
}
响应 DTO:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HeartbeatResponse {
private String responseCode;
}
逻辑很简单。只有我需要知道客户端的IP地址。
我也需要测试一下
我一直在寻找许多资源来测试 Netty 的处理程序,例如
- Testing Netty with EmbeddedChannel
- How to unit test netty handler
然而,它对我不起作用。
对于 EmbeddedChannel,我有以下错误 - Your remote address is embedded.
代码如下:
@ActiveProfiles("test")
@RunWith(MockitoJUnitRunner.class)
public class ProcessingHandlerTest_Embedded {
@Mock
private PermissionService permissionService;
private EmbeddedChannel embeddedChannel;
private final Gson gson = new Gson();
private ProcessingHandler processingHandler;
@Before
public void setUp() {
processingHandler = new ProcessingHandler(permissionService);
embeddedChannel = new EmbeddedChannel(processingHandler);
}
@Test
public void testHeartbeatMessage() {
// given
HeartbeatRequest heartbeatMessage = HeartbeatRequest.builder()
.messageID("heartbeat")
.build();
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
String request = gson.toJson(heartbeatMessage).concat("\r\n");
String expected = gson.toJson(response).concat("\r\n");
// when
embeddedChannel.writeInbound(request);
// then
Queue<Object> outboundMessages = embeddedChannel.outboundMessages();
assertEquals(expected, outboundMessages.poll());
}
}
输出:
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_IP: embedded
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_REQUEST: {"messageID":"heartbeat"}
22:21:29.067 [main] DEBUG handler.ProcessingHandler - heartbeat request: HeartbeatRequest(messageID=heartbeat)
org.junit.ComparisonFailure:
<Click to see difference>
但是,我不知道如何对这种情况进行精确测试。
这是配置的一个片段:
@Bean
@SneakyThrows
public InetSocketAddress tcpSocketAddress() {
// for now, hostname is: localhost/127.0.0.1:9090
return new InetSocketAddress("localhost", nettyProperties.getTcpPort());
// for real client devices: A05264/172.28.1.162:9090
// return new InetSocketAddress(InetAddress.getLocalHost(), nettyProperties.getTcpPort());
}
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the text line codec combination first
pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Delimiters.lineDelimiter()));
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
如何用客户端的IP地址测试处理程序?
两件事可以提供帮助:
如果您的处理程序不可共享,请不要使用 @ChannelHandler.Sharable
进行注释。这可能会产生误导。从处理程序中删除不必要的状态。在您的情况下,您应该删除 remoteAddress
成员变量并确保 Gson
和 CarParkPermissionService
可以重复使用并且是 thread-safe.
"Your remote address is embedded"
不是错误。它实际上是您的处理程序写入出站通道的消息(参见您的 channelActive()
方法)
所以看起来它可以工作。
编辑
在您的评论之后,我对第二点进行了一些澄清。我的意思是:
- 您使用
EmbeddedChannel
的代码几乎是正确的。只是对预期结果有误解(断言)。
要使单元测试成功,您只需:
- 在
channelActive()
中评论这一行:ctx.writeAndFlush("Your remote ...")
- 或在
testHeartbeatMessage()
中轮询来自Queue<Object> outboundMessages
的第二条消息
确实,当你这样做时:
// when
embeddedChannel.writeInbound(request);
(1) 你实际上打开了一次频道,它触发了一个 channelActive()
事件。您没有登录,但我们看到变量 remoteAddress
之后不为空,这意味着它是在 channelActive()
方法中分配的。
(2) 在 channelActive()
方法的末尾,您最终已经通过在通道管道上写入来发回一条消息,如这一行所示:
ctx.writeAndFlush("Your remote address is " + remoteAddress + ".\r\n");
// In fact, this is the message you see in your failed assertion.
(3) 然后收到embeddedChannel.writeInbound(request)
写的消息,可以读取,触发了channelRead()
事件。这一次,我们在您的日志输出中看到:
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_IP: embedded
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_REQUEST: {"messageID":"heartbeat"}
22:21:29.067 [main] DEBUG handler.ProcessingHandler - heartbeat request: HeartbeatRequest(messageID=heartbeat)
(4) 在 channelRead(ChannelHandlerContext ctx, Object msg)
结束时,您将发送 second 消息(预期的消息) :
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
ctx.writeAndFlush(response + "\n\r");
因此,使用以下单元测试代码...
Queue<Object> outboundMessages = embeddedChannel.outboundMessages();
assertEquals(expected, outboundMessages.poll());
...您应该能够 poll()
两个 消息:
"Your remote address is embedded"
"{ResponseCode":"ok"}
你觉得有意义吗?
我有一个带有 Spring Boot 2.3.1
的 Netty TCP 服务器和以下处理程序:
@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
private final CarParkPermissionService permissionService;
private final Gson gson = new Gson();
private String remoteAddress;
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
remoteAddress = ctx.channel().remoteAddress().toString();
if (log.isDebugEnabled()) {
log.debug(remoteAddress);
}
ctx.writeAndFlush("Your remote address is " + remoteAddress + ".\r\n");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("CLIENT_IP: {}", remoteAddress);
String stringMsg = (String) msg;
log.info("CLIENT_REQUEST: {}", stringMsg);
String lowerCaseMsg = stringMsg.toLowerCase();
if (RequestType.HEARTBEAT.containsName(lowerCaseMsg)) {
HeartbeatRequest heartbeatRequest = gson.fromJson(stringMsg, HeartbeatRequest.class);
log.debug("heartbeat request: {}", heartbeatRequest);
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
ctx.writeAndFlush(response + "\n\r");
}
}
请求 DTO:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HeartbeatRequest {
private String messageID;
}
响应 DTO:
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HeartbeatResponse {
private String responseCode;
}
逻辑很简单。只有我需要知道客户端的IP地址。
我也需要测试一下
我一直在寻找许多资源来测试 Netty 的处理程序,例如
- Testing Netty with EmbeddedChannel
- How to unit test netty handler
然而,它对我不起作用。
对于 EmbeddedChannel,我有以下错误 - Your remote address is embedded.
代码如下:
@ActiveProfiles("test")
@RunWith(MockitoJUnitRunner.class)
public class ProcessingHandlerTest_Embedded {
@Mock
private PermissionService permissionService;
private EmbeddedChannel embeddedChannel;
private final Gson gson = new Gson();
private ProcessingHandler processingHandler;
@Before
public void setUp() {
processingHandler = new ProcessingHandler(permissionService);
embeddedChannel = new EmbeddedChannel(processingHandler);
}
@Test
public void testHeartbeatMessage() {
// given
HeartbeatRequest heartbeatMessage = HeartbeatRequest.builder()
.messageID("heartbeat")
.build();
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
String request = gson.toJson(heartbeatMessage).concat("\r\n");
String expected = gson.toJson(response).concat("\r\n");
// when
embeddedChannel.writeInbound(request);
// then
Queue<Object> outboundMessages = embeddedChannel.outboundMessages();
assertEquals(expected, outboundMessages.poll());
}
}
输出:
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_IP: embedded
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_REQUEST: {"messageID":"heartbeat"}
22:21:29.067 [main] DEBUG handler.ProcessingHandler - heartbeat request: HeartbeatRequest(messageID=heartbeat)
org.junit.ComparisonFailure:
<Click to see difference>
但是,我不知道如何对这种情况进行精确测试。
这是配置的一个片段:
@Bean
@SneakyThrows
public InetSocketAddress tcpSocketAddress() {
// for now, hostname is: localhost/127.0.0.1:9090
return new InetSocketAddress("localhost", nettyProperties.getTcpPort());
// for real client devices: A05264/172.28.1.162:9090
// return new InetSocketAddress(InetAddress.getLocalHost(), nettyProperties.getTcpPort());
}
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the text line codec combination first
pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Delimiters.lineDelimiter()));
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
如何用客户端的IP地址测试处理程序?
两件事可以提供帮助:
如果您的处理程序不可共享,请不要使用
@ChannelHandler.Sharable
进行注释。这可能会产生误导。从处理程序中删除不必要的状态。在您的情况下,您应该删除remoteAddress
成员变量并确保Gson
和CarParkPermissionService
可以重复使用并且是 thread-safe."Your remote address is embedded"
不是错误。它实际上是您的处理程序写入出站通道的消息(参见您的channelActive()
方法)
所以看起来它可以工作。
编辑
在您的评论之后,我对第二点进行了一些澄清。我的意思是:
- 您使用
EmbeddedChannel
的代码几乎是正确的。只是对预期结果有误解(断言)。
要使单元测试成功,您只需:
- 在
channelActive()
中评论这一行:ctx.writeAndFlush("Your remote ...")
- 或在
testHeartbeatMessage()
中轮询来自
Queue<Object> outboundMessages
的第二条消息
确实,当你这样做时:
// when
embeddedChannel.writeInbound(request);
(1) 你实际上打开了一次频道,它触发了一个 channelActive()
事件。您没有登录,但我们看到变量 remoteAddress
之后不为空,这意味着它是在 channelActive()
方法中分配的。
(2) 在 channelActive()
方法的末尾,您最终已经通过在通道管道上写入来发回一条消息,如这一行所示:
ctx.writeAndFlush("Your remote address is " + remoteAddress + ".\r\n");
// In fact, this is the message you see in your failed assertion.
(3) 然后收到embeddedChannel.writeInbound(request)
写的消息,可以读取,触发了channelRead()
事件。这一次,我们在您的日志输出中看到:
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_IP: embedded
22:21:29.062 [main] INFO handler.ProcessingHandler - CLIENT_REQUEST: {"messageID":"heartbeat"}
22:21:29.067 [main] DEBUG handler.ProcessingHandler - heartbeat request: HeartbeatRequest(messageID=heartbeat)
(4) 在 channelRead(ChannelHandlerContext ctx, Object msg)
结束时,您将发送 second 消息(预期的消息) :
HeartbeatResponse response = HeartbeatResponse.builder()
.responseCode("ok")
.build();
ctx.writeAndFlush(response + "\n\r");
因此,使用以下单元测试代码...
Queue<Object> outboundMessages = embeddedChannel.outboundMessages();
assertEquals(expected, outboundMessages.poll());
...您应该能够 poll()
两个 消息:
"Your remote address is embedded"
"{ResponseCode":"ok"}
你觉得有意义吗?