Apache Thrift:多任务单服务器和客户端

Apache Thrift: Multitask single Server and Client

我读过 this and this。但是,我的情况不同。我不需要服务器上的多路复用服务,也不需要与服务器的多个连接。

背景:
对于我的大数据项目,我需要计算给定大数据的核心集。 Coreset是大数据的子集,保留了大数据最重要的数学关系。

工作流程:

我的问题:
整个过程作为单线程执行。 客户端解析一个块,然后等待服务器完成核心集的计算,然后解析另一个块,依此类推。

目标:
利用多处理。客户端同时解析多个块,并且对于每个 compute coreset 请求,服务器任务一个线程来处理它。线程数量有限的地方。有点像游泳池。

我知道我需要使用与 TSimpleServer 不同的协议,并转向 TThreadPoolServer 或 TThreadedServer。我只是无法确定选择哪一个,因为两者似乎都不适合我?

TThreadedServer spawns a new thread for each client connection, and each thread remains alive until the client connection is closed.


In TThreadedServer each client connection gets its own dedicated server thread. Server thread goes back to the thread pool after client closes the connection for reuse.

我不需要每个连接一个线程,我想要一个连接,并且服务器同时处理多个服务请求。 可视化:

Client:
Thread1: parses(chunk1) --> Request compute coreset
Thread2: parses(chunk2) --> Request compute coreset
Thread3: parses(chunk3) --> Request compute coreset

Server: (Pool of 2 threads)
Thread1: Handle compute Coreset
Thread2: handle compute Coreset
.
. 
Thread1 becomes available and handles another compute coreset

代码:
api.thrift:

struct CoresetPoint {
    1: i32 row,
    2: i32 dim,
}

struct CoresetAlgorithm {
    1: string path,
}

struct CoresetWeightedPoint {
    1: CoresetPoint point,
    2: double weight,
}

struct CoresetPoints {
    1: list<CoresetWeightedPoint> points,
}

service CoresetService {

    void initialize(1:CoresetAlgorithm algorithm, 2:i32 coresetSize)

    oneway void compressPoints(1:CoresetPoints message)

    CoresetPoints getTotalCoreset()
}


服务器:(为了更好看,删除了实现)

class CoresetHandler:
    def initialize(self, algorithm, coresetSize):

    def _add(self, leveledSlice):

    def compressPoints(self, message):

    def getTotalCoreset(self):


if __name__ == '__main__':
    logging.basicConfig()
    handler = CoresetHandler()
    processor = CoresetService.Processor(handler)
    transport = TSocket.TServerSocket(port=9090)
    tfactory = TTransport.TBufferedTransportFactory()
    pfactory = TBinaryProtocol.TBinaryProtocolFactory()

    server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)

    # You could do one of these for a multithreaded server
    # server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
    # server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)

    print 'Starting the server...'
    server.serve()
    print 'done.'


客户:

try:
    # Make socket
    transport = TSocket.TSocket('localhost', 9090)

    # Buffering is critical. Raw sockets are very slow
    transport = TTransport.TBufferedTransport(transport)

    # Wrap in a protocol
    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    # Create a client to use the protocol encoder
    client = CoresetService.Client(protocol)

    # Connect!
    transport.open()


    // Here data is sliced, and in a loop I move on all files 
       Saved in the directory I specified, then they are parsed and
       client.compressPoints(data) is invoked.

       SliceFile(...)
       p = CoresetAlgorithm(...)
       client.initialize(p, 200)
       for filename in os.listdir('/home/tony/DanLab/slicedFiles'):
           if filename.endswith(".txt"):
               data = _parse(filename)
               client.compressPoints(data)
       compressedData = client.getTotalCoreset()


# Close!
    transport.close()

except Thrift.TException, tx:
    print '%s' % (tx.message)

问题: Thrift 有可能吗?我应该使用什么协议? 我通过在函数声明中添加 oneway 解决了客户端等待服务器完成计算的部分问题 to表示客户端只发出请求,根本不等待任何响应。

从本质上讲,这更像是一个架构问题,而不是 Thrift 问题。给定前提

I don't need a thread per connection, I want a single connection, and the server to handle multiple service requests the same time. Visiualization:

I solved the partial problem of client waiting for server to finish computation by adding oneway to function declaration to indicates that the client only makes a request and does not wait for any response at all.

正在准确描述用例,您需要这样:

+---------------------+
| Client              |
+---------+-----------+
          |
          |
+---------v-----------+
| Server              |
+---------+-----------+
          |
          |
+---------v-----------+          +---------------------+
| Queue<WorkItems>    <----------+ Worker Thread Pool  |
+---------------------+          +---------------------+

服务器唯一的任务是获取请求并尽快将它们插入工作项队列。这些工作项由独立的 工作线程池 处理,否则完全独立于服务器部分。唯一共享的部分是工作项队列,这当然需要正确同步的访问方法。

关于 serevr 的选择:如果服务器足够快地处理请求,甚至 TSimpleServer 也可以。