如何单元测试grpc-java服务器实现功能?

How to unit test grpc-java server implementation functions?

我有一个 GRPC-java 服务器代码的实现,但我没有找到对 StreamObserver 进行单元测试的示例代码。有谁知道对函数进行单元测试的正确方法吗?

public class RpcTrackDataServiceImpl implements TrackDataServiceGrpc.TrackDataService {
    @Override
    public void getTracks(GetTracksRequest request, StreamObserver < GetTracksResponse > responseObserver) {
        GetTracksResponse reply = GetTracksResponse
            .newBuilder()
            .addTracks(TrackInfo.newBuilder()
                .setOwner("test")
                .setTrackName("test")
                .build())
            .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }
}

我建议使用 InProcess 传输。 InProcess 传输非常轻量级,但也使用了很多 "real" 代码,因此其行为与真实传输非常匹配。如果您还对 Channel 和 Server 使用 directExecutor(),那么测试本质上是单线程的,并且是确定性的。 (尽管另一个线程仍将用于截止日期处理。)

虽然问题是对服务进行单元测试,但 InProcess 也非常适合对客户端进行单元测试。

我最终找到了创建实现 StreamObserver 接口的 FakeStreamObserver 的解决方案。
传入FakeStreamObserver执行onNext,onCompleted等
我不确定这是否是最好的方法。

使用上面 Eric 提到的 InProcess 传输,单元测试非常简单。这是一个更明确的代码示例:

我们根据这个 protobuff 定义测试服务:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "servers.dummy";
option java_outer_classname = "DummyProto";
option objc_class_prefix = "DMYS";

package dummy;

import "general.proto";

// The dummy service definition.
service DummyService {
  // # Misc
  // Returns the server version
  rpc getVersion (Empty) returns (ServerVersion) {}
  // Returns the java version
  rpc getJava (Empty) returns (JavaVersion) {}
}


// Transmission data types

(上面包含以下文件:)

syntax = "proto3";

option java_multiple_files = true;
option java_package = "general";
option java_outer_classname = "General";
option objc_class_prefix = "G";

// Transmission data types

message Empty {} // Empty Request or Reply

message ServerVersion {
  string version = 1;
}

message JavaVersion {
  string version = 1;
}

基于 Protoc 编译器生成的 Java 的 DummyService 如下:

package servers.dummy;

import java.util.logging.Logger;

import general.Empty;
import general.JavaVersion;
import general.ServerVersion;
import io.grpc.stub.StreamObserver;

public class DummyService extends DummyServiceGrpc.DummyServiceImplBase {
  private static final Logger logger = Logger.getLogger(DummyService.class.getName());

  @Override
  public void getVersion(Empty req, StreamObserver<ServerVersion> responseObserver) {
    logger.info("Server Version-Request received...");
    ServerVersion version = ServerVersion.newBuilder().setVersion("1.0.0").build();
    responseObserver.onNext(version);
    responseObserver.onCompleted();
  }

  @Override
  public void getJava(Empty req, StreamObserver<JavaVersion> responseObserver) {
    logger.info("Java Version Request received...");
    JavaVersion version = JavaVersion.newBuilder().setVersion(Runtime.class.getPackage().getImplementationVersion() + " (" + Runtime.class.getPackage().getImplementationVendor() + ")").build();
    responseObserver.onNext(version);
    responseObserver.onCompleted();
  }
}

现在我们构建一个 InProcessServer 来运行我们的 Dummy 服务(或您要测试的任何其他服务):

package servers;

import io.grpc.Server;
import io.grpc.inprocess.InProcessServerBuilder;

import java.io.IOException;
import java.util.logging.Logger;

import servers.util.PortServer;

/**
 * InProcessServer that manages startup/shutdown of a service within the same process as the client is running. Used for unit testing purposes.
 * @author be
 */
public class InProcessServer<T extends io.grpc.BindableService> {
  private static final Logger logger = Logger.getLogger(PortServer.class.getName());

  private Server server;
    
  private Class<T> clazz;
    
  public InProcessServer(Class<T> clazz){
    this.clazz = clazz;
  }

  public void start() throws IOException, InstantiationException, IllegalAccessException {
    server = InProcessServerBuilder
        .forName("test")
        .directExecutor()
        .addService(clazz.newInstance())
        .build()
        .start();
    logger.info("InProcessServer started.");
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        // Use stderr here since the logger may have been reset by its JVM shutdown hook.
        System.err.println("*** shutting down gRPC server since JVM is shutting down");
        InProcessServer.this.stop();
        System.err.println("*** server shut down");
      }
    });
  }

  void stop() {
    if (server != null) {
      server.shutdown();
    }
  }

  /**
   * Await termination on the main thread since the grpc library uses daemon threads.
   */
  public void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }
}

我们现在可以使用以下单元测试来测试服务:

package servers;

import static org.junit.Assert.*;
import general.ServerVersion;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import servers.dummy.DummyService;
import servers.dummy.DummyServiceGrpc;
import servers.dummy.DummyServiceGrpc.DummyServiceBlockingStub;
import servers.dummy.DummyServiceGrpc.DummyServiceStub;

public class InProcessServerTest {
  private static final Logger logger = Logger.getLogger(InProcessServerTest.class.getName());

  private InProcessServer<DummyService> inprocessServer;
  private ManagedChannel channel;
  private DummyServiceBlockingStub blockingStub;
  private DummyServiceStub asyncStub;
    
  public InProcessServerTest() {
    super();
  }
    
  @Test
  public void testInProcessServer() throws InterruptedException{
    try {
      String version = getServerVersion();
      assertEquals("1.0.0", version);
    } finally {
      shutdown();
    }
  }

  /** Ask for the server version */
  public String getServerVersion() {
    logger.info("Will try to get server version...");
    ServerVersion response;
    try {
      response = blockingStub.getVersion(null);
    } catch (StatusRuntimeException e) {
      logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      fail();
      return "";
    }
    return response.getVersion();
  }

  @Before
  public void beforeEachTest() throws InstantiationException, IllegalAccessException, IOException {
    inprocessServer = new InProcessServer<DummyService>(DummyService.class);
    inprocessServer.start();       
    channel = InProcessChannelBuilder
        .forName("test")
        .directExecutor()
        // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
        // needing certificates.
        .usePlaintext(true)
        .build();
    blockingStub = DummyServiceGrpc.newBlockingStub(channel);
    asyncStub = DummyServiceGrpc.newStub(channel);
  }

  @After
  public void afterEachTest(){
  channel.shutdownNow();
    inprocessServer.stop();
  }
    
  public void shutdown() throws InterruptedException {
    channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  }
}

该测试仅测试两种方法中的一种,因为它仅用于说明目的。另一种方法可以相应测试。

有关如何同时测试服务器和客户端的更多信息,请参阅 RouteGuideExample: https://github.com/grpc/grpc-java/blob/master/examples/src/test/java/io/grpc/examples/routeguide/RouteGuideServerTest.java

我将插入官方 gRPC 示例中的片段。我已经基于这些 HelloWorld 示例成功创建了测试。

HelloWorldService:

/*
 * Copyright 2015, gRPC Authors All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.grpc.examples.helloworld;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.logging.Logger;

/**
 * Server that manages startup/shutdown of a {@code Greeter} server.
 */
public class HelloWorldServer {
  private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());

  private Server server;

  private void start() throws IOException {
    /* The port on which the server should run */
    int port = 50051;
    server = ServerBuilder.forPort(port)
        .addService(new GreeterImpl())
        .build()
        .start();
    logger.info("Server started, listening on " + port);
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        // Use stderr here since the logger may have been reset by its JVM shutdown hook.
        System.err.println("*** shutting down gRPC server since JVM is shutting down");
        HelloWorldServer.this.stop();
        System.err.println("*** server shut down");
      }
    });
  }

  private void stop() {
    if (server != null) {
      server.shutdown();
    }
  }

  /**
   * Await termination on the main thread since the grpc library uses daemon threads.
   */
  private void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
      server.awaitTermination();
    }
  }

  /**
   * Main launches the server from the command line.
   */
  public static void main(String[] args) throws IOException, InterruptedException {
    final HelloWorldServer server = new HelloWorldServer();
    server.start();
    server.blockUntilShutdown();
  }

  static class GreeterImpl extends GreeterGrpc.GreeterImplBase {

    @Override
    public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
      HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }
}

测试:

/*
 * Copyright 2016, gRPC Authors All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.grpc.examples.helloworld;

import static org.junit.Assert.assertEquals;

import io.grpc.examples.helloworld.HelloWorldServer.GreeterImpl;
import io.grpc.testing.GrpcServerRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
 * Unit tests for {@link HelloWorldServer}.
 * For demonstrating how to write gRPC unit test only.
 * Not intended to provide a high code coverage or to test every major usecase.
 *
 * <p>For more unit test examples see {@link io.grpc.examples.routeguide.RouteGuideClientTest} and
 * {@link io.grpc.examples.routeguide.RouteGuideServerTest}.
 */
@RunWith(JUnit4.class)
public class HelloWorldServerTest {
  /**
   * This creates and starts an in-process server, and creates a client with an in-process channel.
   * When the test is done, it also shuts down the in-process client and server.
   */
  @Rule
  public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();

  /**
   * To test the server, make calls with a real stub using the in-process channel, and verify
   * behaviors or state changes from the client side.
   */
  @Test
  public void greeterImpl_replyMessage() throws Exception {
    // Add the service to the in-process server.
    grpcServerRule.getServiceRegistry().addService(new GreeterImpl());

    GreeterGrpc.GreeterBlockingStub blockingStub =
        GreeterGrpc.newBlockingStub(grpcServerRule.getChannel());
    String testName = "test name";

    HelloReply reply = blockingStub.sayHello(HelloRequest.newBuilder().setName(testName).build());

    assertEquals("Hello " + testName, reply.getMessage());
  }
}

如果您按照此处描述的方式克隆示例存储库,您可以找到其他示例:

https://grpc.io/docs/tutorials/basic/java.html

希望对你也有帮助。

Br, 雷纳托

@RunWith(JUnit4.class)
public class HelloWorldServerTest {
  /**
   * This rule manages automatic graceful shutdown for the registered servers and channels at the
   * end of test.
   */
  @Rule
  public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();

  /**
   * To test the server, make calls with a real stub using the in-process channel, and verify
   * behaviors or state changes from the client side.
   */
  @Test
  public void greeterImpl_replyMessage() throws Exception {
    // Generate a unique in-process server name.
    String serverName = InProcessServerBuilder.generateName();

    // Create a server, add service, start, and register for automatic graceful shutdown.
    grpcCleanup.register(InProcessServerBuilder
        .forName(serverName).directExecutor().addService(new GreeterImpl()).build().start());

    GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(
        // Create a client channel and register for automatic graceful shutdown.
        grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));


    HelloReply reply =
        blockingStub.sayHello(HelloRequest.newBuilder().setName( "test name").build());

    assertEquals("Hello test name", reply.getMessage());
  }
}

https://github.com/grpc/grpc-java/blob/master/examples/src/test/java/io/grpc/examples/helloworld/HelloWorldServerTest.java

首先,重构代码以便更容易进行单元测试:

public class RpcTrackDataServiceImpl implements TrackDataServiceGrpc.TrackDataService {
  @Override
  public void getTracks(GetTracksRequest request, StreamObserver<GetTracksResponse> responseObserver) {
    GetTracksResponse reply = getTracks(request);

    responseObserver.onNext(reply);
    responseObserver.onCompleted();
  }

  @VisibleForTesting
  GetTracksResponse getTracks(GetTracksRequest request) {
    return GetTracksResponse
        .newBuilder()
        .addTracks(TrackInfo.newBuilder()
            .setOwner("test")
            .setTrackName("test")
            .build())
        .build();
  }
}

然后可以为每个编写小测试(如果使用 Spring 启动更容易):

public class UnitTest {
  private final ApplicationContextRunner applicationContextRunner = new ApplicationContextRunner();

  @Configuration
  public static class GetTracksConfiguration {
    @Bean
    public GetTracksService getTracksService() {
      return new GetTracksService();
    }
  }

  @Test
  public void replyShouldBeSent() {
    final GetTracksRequest request = GetTracksRequest.newBuilder().build();
    final StreamObserver<GetTracksResponse> response = mock(StreamObserver.class);

    applicationContextRunner
        .withUserConfiguration(RequestTracksConfiguration.class)
        .run(context -> {
          assertThat(context) 
              .hasSingleBean(RequestTracksService.class);
          context.getBean(RequestTracksService.class)
              .getTracks(request, response);

          verify(response, times(1)).onNext(any(GetTracksResponse.class));
          verify(response, times(1)).onCompleted();
          verify(response, never()).onError(any(Throwable.class));
        });
  }

  @Test
  public void shouldTestLogic {
    assertLogicInFactoredOutMethodIsCorrect();
  }

较大的测试应该只测试启动和接线:

@RunWith(SpringRunner.class)
@SpringBootTest(
    classes = {GetTracksService.class}
)
@EnableAutoConfiguration
public class SmokeTest {
  private GetTracksServiceGrpc.GetTracksServiceBlockingStub blockingStub;

  @Test
  public void springClientConnects() {
    final GetTracksRequest request = GetTracksRequest.newBuilder()
        .build();

    assertNotNull(blockingStub.getTracks(request));
  }
}

Note: The above code may not work OOTB since I've left out some annotations we use internally. The major point is there's no need to pay for the cost of bringing up a server for unit tests that are meant to test logic.