RPC 无法解码 TCP 传输的参数

RPC can't decode arguments for TCP transport

我正在基于此页面中的示例开发多线程 RPC 服务器: http://bderzhavets.blogspot.ca/2005/11/multithreaded-rpc-server-in-white-box.html

不幸的是,它并没有完全开箱即用,在追查错误一段时间后,我发现服务器无法解码参数(基于来自 return 的代码squareproc_2)。在函数 serv_request 中调用 squareproc_2_svc 后,服务器端的执行似乎停止了。请参阅下面代码中的 case: SQUAREPROC 来自 square_svc.c

void *serv_request(void *data)
{
    struct thr_data *ptr_data = (struct thr_data *)data;
    {
        square_in argument;
        square_out result;
        bool_t retval;
        xdrproc_t _xdr_argument, _xdr_result;
        bool_t (*local)(char *, void *, struct svc_req *);
        struct svc_req *rqstp = ptr_data->rqstp;
        register SVCXPRT *transp = ptr_data->transp;
        switch (rqstp->rq_proc) {
            case NULLPROC:
                printf("NULLPROC called\n");
                (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
                return;
            case SQUAREPROC:
                _xdr_argument = (xdrproc_t) xdr_square_in;
                _xdr_result = (xdrproc_t) xdr_square_out;
                printf("_xdr_result = %ld\n",_xdr_result);
                local = (bool_t (*) (char *, void *,  struct svc_req *))squareproc_2_svc;
                break;
            default:
                printf("default case executed");
                svcerr_noproc (transp);
                return;
        }
        memset ((void *)&argument, 0, sizeof (argument));
        if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
            printf("svc_getargs failed");
            svcerr_decode (transp);
            return;
        }
        retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);
        printf("serv_request result: %d\n",retval);
        if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result))
        {
            printf("something happened...\n");
            svcerr_systemerr (transp);
        }
        if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
            fprintf (stderr, "%s", "unable to free arguments");
            exit (1);
        }
        if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result))
            fprintf (stderr, "%s", "unable to free results");
        return;
    }
}

这是文件 square_server.c:

squareproc_2_svc 的实现
bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp)
{
    printf("Thread id = '%ld' started, arg = %ld\n",pthread_self(),inp->arg1);
    sleep(5);
    outp->res1=inp->arg1*inp->arg1;
    printf("Thread id = '%ld' is done %ld \n",pthread_self(),outp->res1);
    return(TRUE);
}

客户端输出:

yak@AcerPC:~/RPC/multithread_example$ ./ClientSQUARE localhost 2
squareproc_2 called
xdr_square_in result: 1
function call failed; code: 11

服务器端输出:

yak@AcerPC:~/RPC/multithread_example$ sudo ./ServerSQUARE 
creating threads
SQUAREPROC called
xdr_square_in result: 0

如您所见,xdr_square_in return服务器端的结果为 FALSE。 这是 square.x

struct square_in {
    long arg1;
};

struct square_out {
    long res1;
};

program SQUARE_PROG {
    version SQUARE_VERS {
        square_out SQUAREPROC(square_in) = 1;
    } = 2 ;
} = 0x31230000;

和square_xdr.c

/*
 * Please do not edit this file.
 * It was generated using rpcgen.
 */

#include "square.h"

bool_t
xdr_square_in (XDR *xdrs, square_in *objp)
{
    register int32_t *buf;
    int retval;
    if (!xdr_long (xdrs, &objp->arg1)) retval = FALSE;
    else retval = TRUE;
    printf("xdr_square_in result: %d\n",retval);
    return retval;
}

bool_t
xdr_square_out (XDR *xdrs, square_out *objp)
{
    register int32_t *buf;
    int retval;
    if (!xdr_long (xdrs, &objp->res1)) retval = FALSE;
    else retval = TRUE;
    printf("xdr_square_out result: %d\n",retval);
    return retval;
}

我正在使用 Ubuntu 14.04 LTS,使用 rpcgen -a -M 生成存根和 xdr 代码,并使用 gcc.

进行编译

该错误似乎只在使用 TCP 作为传输方法时出现。我可以使用 UDP 作为传输获得结果,但是当来自多个客户端的请求同时到达时,一些调用会失败。我希望能够支持多达 15 个客户端。当我尝试使用 UDP 和 10 个客户端时,10 个调用中有 2 个失败,并且 return 代码与 squareproc_2.

不同

你遇到了一些问题。

从xen页面,当它在square_prog_2中做pthread_create时,它首先调用pthread_attr_setdetachstate,但它需要做pthread_attr_init 之前。此外,attr 似乎是 static/global--put 它在函数的堆栈框架中。

square_prog_2 得到两个参数:rqstp 和 transp。这些被保存到 malloc'ed 结构 data_str [因此每个线程都有自己的副本]。但是,我想知道 rqstp 和 transp 值是多少(例如 printf("%p"))。它们需要不同,否则每个线程在尝试使用它们时会相互冲突 [因此需要 pthread_mutex_lock]。 malloc 不会克隆 rqstp/transp 所以如果它们相同,那就是问题所在,因为您可能有两个线程试图同时在相同的缓冲区上重复。

有一个 return 代码 11。除了一些特殊代码,它看起来很像线程上的 SIGSEGV。 rqstp/transp 重叠完全可以解释这一点。

您可能需要重新设计它,因为我怀疑 XDR 不是线程安全的——也不应该是线程安全的。另外,我不认为 svc_* 是线程 safe/aware.

启动单线程。作为测试,让 square_prog_2 直接调用 serv_request(例如 而不是 pthread_*)。我敢打赌它适用于所有模式。

如果是这样,请抓住你的帽子——使用线程的示例代码被破坏了——充满了竞争条件和段错误等。如果你不喜欢使用线程(不需要这样的灯值班任务x * x),你可以尽情享受

否则,解决方案有点复杂。主线程必须全部 访问套接字和所有XDR parsing/encoding。它不能使用 svc_run--you 必须自己滚动。 child 只能 完成实际工作(例如 x * x)并且可能 不能 触及 socket/req/transp,等等.

主线程:

while (1) {
    if (svc_getreq_poll()) {
        // parse XDR
        // create data/return struct for child thread
        // create thread
        // add struct to list of "in-flight" requests
    }

    forall struct in inflight {
        if (reqdone) {
            // take result from struct
            // encode into XDR
            // do send_reply
            // remove struct from list
        }
    }
}

对于 child 结构,它看起来像:

struct child_struct {
    int num;
    int num_squared;
};

而child的线程函数变成了一个线程:ptr->num_squared = ptr->num * ptr->num

更新:多线程 RPC 服务器似乎 在 Linux 或 FreeBSD

下得到支持

这是一个文档:https://www.redhat.com/archives/redhat-list/2004-June/msg00439.html这是一个更清晰的示例。

由此可见:记住 -Linux 下不支持 rpcgen 的一个选项。库调用 由 SunOS RPC 提供的构建多线程 RPC 服务器在 Linux 下也不可用

这是 Linux rpcgen 手册页:http://linux.die.net/man/1/rpcgen 没有提到 -M。 IMO,这意味着 rpcgen 程序可以选择并生成存根,但底层支持不存在,因此他们将其排除在文档之外。

这是 FreeBSD 手册页 [以及不支持的原因]:http://www.freebsd.org/cgi/man.cgi?query=rpcgen&sektion=1&manpath=FreeBSD+5.0-RELEASE 请参阅其中 -M 的文档:

M -- 生成 multithread-safe 存根以传递参数和结果 rpcgen 生成的代码和用户编写的代码之间。这个选项 对于想要在代码中使用线程的用户很有用。但是,rpc_svc_calls(3) 函数还没有 MT-safe,这 意味着 rpcgen 生成的 server-side 代码不会是 MT-safe.

另一种方式:

为什么要用 RPC/XDR 呢?对于您打算使用的大型阵列,开销是巨大的。大多数标准用途是黄页之类的东西,没有太多数据。

现在大多数系统都是小端。只需将本机缓冲区爆炸到您直接打开的套接字即可。在服务器上,让守护进程进行监听,然后 fork a child 并让 child 进行接受、读入数据、进行计算,并发回回复。在最坏的情况下,child 将需要进行字节序交换,但这很容易使用 bswap_32.

在紧密循环中完成

每个消息开头的一个简单的小控制结构,在任一方向上作为数据有效负载的前缀:

struct msgcontrol {
    int what_i_am;
    int operation_to_perform;
    int payload_length;
    int payload[0];
};

特别说明:我以前在商业上做过这个(例如 MPI 和我自己的滚动),你可能必须发出 setsockopt 调用来增加 kernel 套接字的大小缓冲到足以维持大量数据的东西

实际上,现在我想起来了,如果你不想自己动手,MPI 可能会感兴趣。但是,在使用它之后,我并不是它的忠实粉丝。它出现了意想不到的问题,我们不得不将其删除以直接控制我们的套接字。