如何使用 Thrift 代替 RPC 进行消息传递

How to use Thrift for message passing instead of RPC

我知道 Thrift 主要针对成熟的客户端-服务器 RPC,但从高层架构来看,在我看来它也应该完全适用于双向消息传递。

我想在两端(C、.NET Core)构建的内容如下:

我不需要线程服务器,...任何花哨的东西。本质上,我很想掌握什么,例如Protobuffs 提供开箱即用的处理方式,可以在接收端缓冲整个消息和一般的消息框架。

问题是我找不到任何关于如何使用当前库(我个人对 .NET Core 和 C 库)API 开始构建它的文档。我唯一发现的是这个 question 但它并没有真正指向任何资源。

Thrift 是一个 RPC 和序列化框架。这意味着,您也可以只使用没有 RPC 的序列化部分。

与消息系统结合使用的方式通常(大致)如下:

  • 将消息序列化到缓冲区
  • 以任何方式发送缓冲区
  • 接收端反序列化缓冲区并处理数据

如果您打算通过同一个渠道发送不同类型的消息,最好使用包含所有可能消息正文的 union 信封结构:

 struct MessageOne {
      // contents of this message type
 }

 struct MessageTwo {
      // contents of this message type
 }

 struct MessageThree {
      // contents of this message type
 }

 union MyMessageEnvelope {
      1: MessageOne   one
      2: MessageTwo   two
      3: MessageThree  three
      // easily extendable 
 }

为了让它更完美 elegant/reusable,还可以实现自定义传输以满足需求并进一步封装逻辑。 Thrift 的模块化结构使其变得简单(您链接的 post 也指的是那个)。源代码树的 /contrib 文件夹中有一些示例,可以作为起点。

如果您完全不知道从哪里开始:先查看教程,然后查看测试套件程序,两者都是 Thrift 初学者的绝佳学习资源。

做一些非常相似的事情的一些笔记:

  • 同时使用 C#(.Net Core 和 Framework 的混合)和 C++ 的客户端
  • 使用 Thrift RPC 以及 "plain messages" 进行 pub/sub 和一般序列化

很好,因为它可以更轻松地反序列化消息。

鉴于以下节俭:

struct SubscribeRequest {
    1: string topic,
    2: string appid,
}

struct SubscribeReply {
    1: bool success,
    2: string topic,
}
service HttpService {
    HttpSDKDataTypes.SubscribeReply Subscribe(1: HttpSDKDataTypes.SubscribeRequest message),
}

thrift -gen netcore 给你:

  public async Task<Ruyi.SDK.Http.SubscribeReply> SubscribeAsync(Ruyi.SDK.Http.SubscribeRequest message, CancellationToken cancellationToken)
  {
    await OutputProtocol.WriteMessageBeginAsync(new TMessage("Subscribe", TMessageType.Call, SeqId), cancellationToken);

消息标识符包含在 RPC 调用中。如果您不使用 RPC 调用,您会得到 "raw" 结构,但没有指示如何反序列化它们。

将它们放在 union:

union UnionExample {
    1: SubscribeRequest request,
    2: SubscribeReply reply,    
}

为您处理:

public async Task WriteAsync(TProtocol oprot, CancellationToken cancellationToken)
{
  oprot.IncrementRecursionDepth();
  try
  {
    var struc = new TStruct("SubscribeRequest");
    await oprot.WriteStructBeginAsync(struc, cancellationToken);
    var field = new TField();
    if (Topic != null && __isset.topic)
    {
      field.Name = "topic";
      field.Type = TType.String;
      field.ID = 1;

反序列化后,可以用:

    public void handler(UnionExample example)
    {
        if (example.__isset.request)
        {
            SubscribeRequest msg = example.request;
            // ...
        }
        else if (example.__isset.reply)
        {
            SubscribeReply msg = example.reply;
            // ...
        }

检查生成器中可用的选项。 thrift -gen "csharp:union,async" 让我们使用 pattern matching:

   public void handler(UnionExample example)
    {
        switch (example.Data)
        {
            case SubscribeRequest msg:
                //...
            case SubscribeReply msg:
                //...

不幸的是 netcore 生成器在 0.11.0 中不这样做。

thrift github 存储库有 examples of serializing into memory(而不是使用 RPC)。一般来说,它类似于:

Stream stm = new MemoryStream();
TTransport trans = new TStreamTransport(null, stm);
TProtocol prot = new TJSONProtocol(trans);

如果您要创建大量 MemoryStream 个实例,请查看 Microsoft.IO.RecycableMemoryStream

使用自定义 protocol/transport 将简化发送消息的过程,因为它将处理样板(并避免首先序列化到内存的额外对象,然后再使用它)。 thrift contrib/ folder 已被提及。 Here's our C# example 在 ZeroMQ 上使用 Thrift。

最后一点。如果您打算完全使用 RPC 功能,请使用单个结构参数编写您的服务方法。含义:

service HttpService {
    // Do this
    string Subscribe(1: SubscribeRequest message),
    // Not this
    string Subscribe(1: string topic, 2: string appid,),
}

摆脱 RPC 会更容易and/or重用消息。