rpc.ServerCodec 还在服务吗?

rpc.ServerCodec Still Serving?

我正在执行一些 RPC 测试,偶然发现了一个我似乎无法解决的问题。在我的测试中,我创建了三个独立的 RPC 服务器,我尝试关闭所有这些服务器。然而,在执行我的最后一个测试(TestRpcCodecServerClientComm)时,我的客户端连接似乎正在连接到我启动的第一个 RPC 服务器(我知道这一点是因为我在某些时候将 ID 附加到 RPCHandlers),即使我尝试了一切我可以确保它被关闭了。虽然代码不在那里,但我已经尝试检查每一个错误,但这并没有带来任何结果。

rpc.go

package rbot

import (
    "io"
    "net"
    "net/rpc"
    "net/rpc/jsonrpc"
)

func RpcCodecClientWithPort(port string) (rpc.ClientCodec, error) {
    conn, err := net.Dial("tcp", "localhost:"+port)
    if err != nil {
        return nil, err
    }
    return jsonrpc.NewClientCodec(conn), nil
}

func RpcCodecServer(conn io.ReadWriteCloser) rpc.ServerCodec {
    return jsonrpc.NewServerCodec(conn)
}

rpc_test.go

package rbot

import (
    "errors"
    "fmt"
    "net"
    "net/rpc"
    "testing"
)

type RPCHandler struct {
    RPCServer net.Listener
    conn      rpc.ServerCodec
    done      chan bool
    TestPort  string
    stop      bool
    GotRPC    bool
}

func (r *RPCHandler) SetupTest() {
    r.stop = false
    r.GotRPC = false
    r.done = make(chan bool)
    r.TestPort = "5556"
}

// TODO: Create separate function to handle erroring
func (r *RPCHandler) CreateRPCServer() error {
    rpc.RegisterName("TestMaster", TestAPI{r})

    var err error
    r.RPCServer, err = net.Listen("tcp", ":"+r.TestPort)

    if err != nil {
        return err
    }

    go func() {
        for {
            conn, err := r.RPCServer.Accept()
            if err != nil || r.stop {
                r.done <- true
                return
            }
            r.conn = RpcCodecServer(conn)
            rpc.ServeCodec(r.conn)
        }
    }()
    return nil
}

func (r *RPCHandler) CloseRPCServer() error {
    r.stop = true
    if r.conn != nil {
        err := r.conn.Close()
        if err != nil {
            fmt.Println(err)
        }
    }
    err := r.RPCServer.Close()
    <-r.done
    return err
}

type TestAPI struct {
    t *RPCHandler
}

func (tapi TestAPI) Send(msg string, result *string) error {
    if msg == "Got RPC?" {
        tapi.t.GotRPC = true
        return nil
    }
    return errors.New("Didn't receive right message")
}

// Check if we can create and close an RPC server successfully using the RPC server codec.
func TestRpcCodecServer(t *testing.T) {
    r := RPCHandler{}
    r.SetupTest()

    err := r.CreateRPCServer()
    if err != nil {
        t.Fatalf("Could not create rpc server! %s:", err.Error())
    }

    err = r.CloseRPCServer()
    if err != nil {
        t.Fatalf("Could not close RPC server! %s:", err.Error())
    }
}

// Check if we can create a client without erroring.
func TestRpcCodecClientWithPortt(t *testing.T) {
    r := RPCHandler{}
    r.SetupTest()
    r.CreateRPCServer()
    defer r.CloseRPCServer()

    RPCClient, err := RpcCodecClientWithPort(r.TestPort)
    defer RPCClient.Close()
    if err != nil {
        t.Fatalf("Could not create an RPC client! %s:", err.Error())
    }
}

// Let's double check and make sure our server and client can speak to each other
func TestRpcCodecServerClientComm(t *testing.T) {
    r := RPCHandler{}
    r.SetupTest()
    r.CreateRPCServer()
    defer r.CloseRPCServer()

    RPCCodec, _ := RpcCodecClientWithPort(r.TestPort)
    RPCClient := rpc.NewClientWithCodec(RPCCodec)
    defer RPCClient.Close()

    var result string
    err := RPCClient.Call("TestMaster.Send", "Got RPC?", &result)
    if err != nil {
        t.Fatalf("Error while trying to send RPC message: %s", err.Error())
    }

    if !r.GotRPC {
        t.Fatalf("Could not send correct message over RPC")
    }
}

不确定我是否只是对连接处理不当或类似的问题,我们将不胜感激。

记录 RPC api 确实收到了正确的字符串消息

虽然不是问题的根源,但您的测试配置有一些竞争条件,您应该在它们导致问题之前处理这些竞争条件。始终检查 -race 选项是否存在问题。您还应该让 OS 分配端口,这样您就不会 运行 发生冲突。例如,请参阅 httptest.Server 的工作原理。

你在这里的失败是你没有为每个测试创建一个新的 rpc.Server,你在重复使用 rpc.DefaultServer。对 CreateRPCServer 的第一次调用在名称 TestMaster 下注册了一个 TestAPI。每个后续调用都使用已注册的实例。

如果您每次设置测试并注册新的 TestAPI 时都创建一个新的 rpc.Server,则最终测试将通过。

srv := rpc.NewServer()
srv.RegisterName("TestMaster", testAPI)

...
// and then use srv to handle the new connection
srv.ServeCodec(RpcCodecServer(conn))