ZeroMQ 上的 MS Bond 序列化运行时 "EndOfStreamException" 错误

Runtime "EndOfStreamException" error in MS Bond serialization over ZeroMQ

首先,值得一提的是,在单个 F# 解决方案中,Bond 消息的序列化和反序列化工作正常。但是,我无法正确处理通过 ZeroMQ 发送 and/or 接收消息。

以下程序的订阅者端存在运行时错误。 .bond 文件是用 bond 编译器定义和编译的。然后从 C# 创建一个 dll,以便从 F# 调用。然后我有两个 F# 程序。一个通过 tcp 套接字发布序列化数据,另一个是订阅者。当在 sub 上收到消息时,尝试解组原始数据的行是导致运行时错误的行。谁能看出这是什么原因?

[编辑] 根据 Fyodor 的评论,我在发布者端进行了更改,从而更改了订阅者端的错误。所以这个错误可能与我打包和解包信息的方式有关。

这是 .bond 文件

namespace Examples

struct Record
{
    0: map<string, double> payload;
}

这是发布者:

// publisher

open System
open Bond
open Bond.Protocols
open Bond.IO.Safe
open ZeroMQ

let ctx = new ZContext()
let publisher = new ZSocket(ctx, ZSocketType.PUB)
publisher.Bind("tcp://*:5556")

let src = new Examples.Record()
src.payload.Add("a", 1.)
src.payload.Add("b", 2.)

let output = new OutputBuffer()
let writer = new CompactBinaryWriter<OutputBuffer>(output)

while true do
    Marshal.To(writer, src)
    //let input = new InputBuffer(output.Data)
    //let byteArr = input.ReadBytes(int(input.Length - 1L))
    let updateFrame = new ZFrame(System.Text.Encoding.ASCII.GetString output.Data.Array)
    publisher.Send(updateFrame)

这是订阅者:

// subscriber

open Bond
open Bond.Protocols
open Bond.IO.Safe
open System
open System.Text
open ZeroMQ

let ctx = new ZContext()
let subscriber = new ZSocket(ctx, ZSocketType.SUB)
subscriber.Connect("tcp://127.0.0.1:5556")
subscriber.SubscribeAll()

let output = new OutputBuffer()    
while true do    
    let received = subscriber.ReceiveFrame()
    let byteArr = Encoding.ASCII.GetBytes (received.ReadString())
    let arrSeg = ArraySegment<byte>(byteArr)
    let input = new InputBuffer(arrSeg)
    let dst = Unmarshal<Examples.Record>.From(input)
    for KeyValue(k, v) in dst.payload do
        printfn "%A %A" k v

在接收方,当您尝试将编组的 Bond Compact Binary 解码为 ASCII 字符串时,您将丢失一些有效负载。当将像 Record 这样的结构编组为 Compact Binary 时,有效负载的前四个字节是 0x43 0x42 0x10 0x00。从 ZFrame 读取字符串时,遇到的 the first embedded NUL (0x00) 表示字符串结束,无论帧的大小如何。因此,读取方只看到 0x43 0x42 0x10 而不是整个有效负载(我测试时为 29 个字节)。

由于 Compact Binary 是一个 binary 协议,您需要使用 ZFrame 构造函数,它在发布者端获取缓冲区:

let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)

在订阅者端,您只需要读取缓冲区:

let byteArr = received.Read()

此外,在发布者方面,您不断地将数据累积到同一个 OutputBuffer 中。在将下一条记录编组到 re-use 缓冲区而不是增长它之前,您需要将 output.Position 重置为 0:

while true do  
    Marshal.To(writer, src)
    let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)output.Data.Array)
    publisher.Send(updateFrame)
    output.Position <- 0

另一件需要注意的事情:分配给 OutputBuffer 的默认缓冲区是 65KiB。一旦您知道您的有效负载将有多大,请考虑将其变小。

注意:我在具有相似语义的 C# 应用程序中对此进行了调试。这是我使用的:

namespace so_q_zmq
{
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Bond;
    using Bond.IO.Safe;
    using Bond.Protocols;
    using ZeroMQ;

    [Schema]
    class Record
    {
        [Id(0)]
        public Dictionary<string, double> payload = new Dictionary<string, double>();
    }

    class Program
    {
        static void Main(string[] args)
        {
            var pTask = Task.Run(() =>
            {
                try
                {
                    Publisher();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Publisher failed: {0}", ex);
                }
            });

            var sTask = Task.Run(() =>
            {
                try
                {
                    Subscriber();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Subscriber failed: {0}", ex);
                }
            });

            Task.WaitAll(pTask, sTask);
            Console.WriteLine("Done");
            Console.ReadLine();
        }

        static void Publisher()
        {
            var ctx = new ZContext();
            var publisher = new ZSocket(ctx, ZSocketType.PUB);
            publisher.Bind("tcp://127.0.0.1:12345");

            var src = new Record();
            src.payload.Add("a", 1.0);
            src.payload.Add("b", 2.0);

            var output = new OutputBuffer();
            var writer = new CompactBinaryWriter<OutputBuffer>(output);

            for (;;)
            {
                Marshal.To(writer, src);
                // INCORRECT:
                // var str = Encoding.ASCII.GetString(output.Data.Array);
                // var updateFrame = new ZFrame(str);
                var updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count);
                publisher.Send(updateFrame);
                output.Position = 0;
            }
        }

        static void Subscriber()
        {
            var ctx = new ZContext();
            var subscriber = new ZSocket(ctx, ZSocketType.SUB);
            subscriber.Connect("tcp://127.0.0.1:12345");
            subscriber.SubscribeAll();

            for (;;)
            {
                var received = subscriber.ReceiveFrame();
                // INCORRECT
                // var str = received.ReadString();
                // var byteArr = Encoding.ASCII.GetBytes(str);
                var byteArr = received.Read();
                var arrSeg = new ArraySegment<byte>(byteArr); // There's an InputBuffer ctor that takes a byte[] directly
                var input = new InputBuffer(arrSeg);
                var dst = Unmarshal<Record>.From(input);
                foreach (var kvp in dst.payload)
                {
                    Console.WriteLine("{0} {1}", kvp.Key, kvp.Value);
                }
            }
        }
    }
}