Rx.Net 消息解析器

Rx.Net Message Parser

我正在尝试解析代表消息的传入字节流。 我需要拆分流并为每个部分创建一个消息结构。

消息总是以 0x81 (BOM) 开始,以 0x82 (EOM) 结束。

start:  0x81
header: 3 bytes
data:   arbitrary length
stop:   0x82

数据部分使用转义字节 0x1B (ESC) 进行转义:只要数据部分中的一个字节包含控制字节 {ESC、BOM、EOM} 之一,它就会以 ESC 为前缀。

header 部分未转义,可能包含控制字节。

我想通过使用 IObservable<byte> 并将其转换为 IObservable<Message>.

,以使用 Rx.Net 的函数式响应式代码对此进行编码

最惯用的方法是什么?

一些例子:

[81 01 02 03 82] single message
[81 82 81 82 82] single message, header = [82 81 82]
[81 01 02 1B 82] single message, header = [01 02 1B].
[81 01 02 03 1B 82 82] single message, header = [01 02 03], (unescaped) data = [82]
[81 01 02 03 1B 1B 82 82] single message + dangling [82] which should be ignored.
                          header = [01 02 03], (unescaped) data = [1B]

这是一个状态机图:

您可以只使用基本构建块:Observable.CreateSubscribe。首先让我们获取一些代码,这将帮助我们将流读取为 byte[] 的可观察对象(有许多不同的示例说明如何做到这一点):

static class Extensions {
    public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize) {           
        var buffer = new byte[bufferSize];            
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead,
            stream.EndRead);
        return Observable.While(
            () => stream.CanRead,
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

然后定义消息class:

class Message {
    public byte[] Header { get; set; }
    public byte[] Body { get; set; }
}

然后将其放入小型控制台应用程序中:

    static void Main(string[] args) {
        // original stream
        var stream = new MemoryStream(new byte[] { 0x81, 0x01,0x02,0x03,0x1B,0x1B,0x82,0x82});
        // your initial IObservable<byte[]>
        IObservable<byte[]> bytes = stream.AsyncRead(128); // or any other buffer size
        // your IObservable<Message>
        var observable = Observable.Create<Message>(observer => {
            // some crude parsing code for the sake of example
            bool nextIsEscaped = false;
            bool readingHeader = false;
            bool readingBody = false;
            List<byte> body = new List<byte>();
            List<byte> header = new List<byte>();
            return bytes.Subscribe(buffer => {
                foreach (var b in buffer) {
                    if (b == 0x81 && !nextIsEscaped && !readingHeader) {
                        // start
                        readingHeader = true;
                        readingBody = false;
                        nextIsEscaped = false;
                    }
                    else if (b == 0x82 && !nextIsEscaped && !readingHeader) {
                        // end
                        readingHeader = false;
                        readingBody = false;
                        if (header.Count > 0 || body.Count > 0) {
                            observer.OnNext(new Message() {
                                Header = header.ToArray(),
                                Body = body.ToArray()
                            });
                            header.Clear();
                            body.Clear();
                        }
                        nextIsEscaped = false;
                    }
                    else if (b == 0x1B && !nextIsEscaped && !readingHeader) {
                        nextIsEscaped = true;
                    }
                    else {
                        if (readingHeader) {
                            header.Add(b);
                            if (header.Count == 3) {
                                readingHeader = false;
                                readingBody = true;
                            }
                        }
                        else if (readingBody)
                            body.Add(b);
                        nextIsEscaped = false;
                    }
                }
            });
        });
        observable.Subscribe(msg =>
        {
            Console.WriteLine("Header: " + BitConverter.ToString(msg.Header));
            Console.WriteLine("Body: " + BitConverter.ToString(msg.Body));
        });
        Console.ReadKey();
    }  

如果您正在寻找 "more functional" 的内容,那么这可能会有所帮助,但是@Evk 的回答也通过了这些测试。

首先,我可以建议,为了避免提供可验证的答案,你能否提供一个测试套件来解决像这样的复杂问题。

像这样的东西会很有帮助。

var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<byte>(
    ReactiveTest.OnNext<byte>(01,0x81), //BOM m1
    ReactiveTest.OnNext<byte>(02,0x01), 
    ReactiveTest.OnNext<byte>(03,0x02), 
    ReactiveTest.OnNext<byte>(04,0x03), 
    ReactiveTest.OnNext<byte>(05,0x82), //EOM m1
    ReactiveTest.OnNext<byte>(06,0x81), //BOM m2
    ReactiveTest.OnNext<byte>(07,0x82), 
    ReactiveTest.OnNext<byte>(08,0x81), 
    ReactiveTest.OnNext<byte>(09,0x82), 
    ReactiveTest.OnNext<byte>(10,0x82), //EOM m2
    ReactiveTest.OnNext<byte>(11,0x81), //BOM m3
    ReactiveTest.OnNext<byte>(12,0x01),     
    ReactiveTest.OnNext<byte>(13,0x02), 
    ReactiveTest.OnNext<byte>(14,0x1B), 
    ReactiveTest.OnNext<byte>(15,0x82), //EOM m3
    ReactiveTest.OnNext<byte>(16,0x81), //BOM m4
    ReactiveTest.OnNext<byte>(17,0x01), 
    ReactiveTest.OnNext<byte>(18,0x02), 
    ReactiveTest.OnNext<byte>(19,0x03), 
    ReactiveTest.OnNext<byte>(20,0x1B), //Control character 
    ReactiveTest.OnNext<byte>(21,0x82), //Data
    ReactiveTest.OnNext<byte>(22,0x82), //EOM m4
    ReactiveTest.OnNext<byte>(23,0x81), //BOM m5
    ReactiveTest.OnNext<byte>(24,0x01), 
    ReactiveTest.OnNext<byte>(25,0x02), 
    ReactiveTest.OnNext<byte>(26,0x03), 
    ReactiveTest.OnNext<byte>(27,0x1B), //Control character 
    ReactiveTest.OnNext<byte>(28,0x1B), //Data
    ReactiveTest.OnNext<byte>(29,0x82), //EOM m5
    ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81)

var observer = scheduler.CreateObserver<Message>();

//CurrentAnswer(source)
MyAnswer(source)
    .Subscribe(observer);

scheduler.Start();

ReactiveAssert.AreElementsEqual(
    new[] {
        ReactiveTest.OnNext(05, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[0]{}}),
        ReactiveTest.OnNext(10, new Message(){Header=new byte[]{0x82, 0x81, 0x82}, Data=new byte[0]{}}),
        ReactiveTest.OnNext(15, new Message(){Header=new byte[]{0x01, 0x02, 0x1B}, Data=new byte[0]{}}),
        ReactiveTest.OnNext(22, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x82}}),
        ReactiveTest.OnNext(29, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x1B}}),
    },                                                  
    observer.Messages); 

我也写了一个Message的版本可以让我验证代码

public class Message
{
    public static readonly byte BOM = 0x81;
    public static readonly byte EOM = 0x82;
    public static readonly byte Control = 0x1B;

    public byte[] Header { get; set; }
    public byte[] Data { get; set; }

    public static Message Create(byte[] bytes)
    {   
        if(bytes==null)
            throw new ArgumentNullException(nameof(bytes));
        if(bytes.Length<3)
            throw new ArgumentException("bytes<3").Dump();


        var header = new byte[3];
        Array.Copy(bytes, header, 3);

        var body = new List<byte>();
        var escapeNext = false;
        for (int i = 3; i < bytes.Length; i++)
        {
            var b = bytes[i];

            if (b == Control && !escapeNext)
            {
                escapeNext = true;
            }
            else
            {
                body.Add(b);
                escapeNext = false;
            }
        }
        var msg = new Message { Header = header, Data = body.ToArray()};
        return msg;
    }

    public override string ToString()
    {
        return string.Format("Message(Header=[{0}], Data=[{1}])", ByteArrayString(Header), ByteArrayString(Data));
    }

    private static string ByteArrayString(byte[] bytes)
    {
        return string.Join(",", bytes.Select(b => b.ToString("X")));
    }

    public override bool Equals(object obj)
    {
        var other = obj as Message;
        if(obj==null)
            return false;
        return Equals(other);
    }

    protected bool Equals(Message other)
    {
        return IsSequenceEqual(Header, other.Header) 
            && IsSequenceEqual(Data, other.Data);
    }

    private bool IsSequenceEqual<T>(IEnumerable<T> expected, IEnumerable<T> other)
    {
        if(expected==null && other==null)
            return true;
        if(expected==null || other==null)
            return false;
        return expected.SequenceEqual(other);
    }

    public override int GetHashCode()
    {
        unchecked
        {
            return ((Header != null ? Header.GetHashCode() : 0) * 397) ^ (Data != null ? Data.GetHashCode() : 0);
        }
    }
}

现在我已经准备好了所有的管道,我可以专注于实际问题了。

public static IObservable<Message> MyAnswer(IObservable<byte> source)
{
    return source.Publish(s =>
        {

            return 
                Observable.Defer(()=>
                    //Start consuming once we see a BOM
                    s.SkipWhile(b => b != Message.BOM)
                     .Scan(new Accumulator(), (acc, cur)=>acc.Accumulate(cur))
                )
                .TakeWhile(acc=>!acc.IsEndOfMessage())
                .Where(acc=>!acc.IsBeginingOfMessage())
                .Select(acc=>acc.Value())
                .ToArray()
                .Where(buffer=>buffer.Any())
                .Select(buffer => Message.Create(buffer))
                .Repeat();
        }); 

}
public class Accumulator
{
    private int _index = 0;
    private byte _current =0;
    private bool _isCurrentEscaped = false;
    private bool _isNextEscaped = false;

    public Accumulator Accumulate(byte b)
    {
        _index++;
        _current = b;
        _isCurrentEscaped = _isNextEscaped;
        _isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control);
        return this;
    }
    public byte Value()
    {
        return _current;
    }

    private bool IsHeader()
    {
        return _index < 5;
    }
    public bool IsBeginingOfMessage()
    {
        return _index == 1 && _current == Message.BOM;
    }
    public bool IsEndOfMessage()
    {
        return !IsHeader()
            && _current == Message.EOM 
            && !_isCurrentEscaped;
    }
}

为了完整起见,这里是@Evk 的答案的核心内容,因此您可以轻松地换入和换出实现。

public static IObservable<Message> CurrentAnswer(IObservable<byte> source)
{
    return Observable.Create<Message>(o =>
    {
        // some crude parsing code for the sake of example
        bool nextIsEscaped = false;
        bool readingHeader = false;
        bool readingBody = false;
        List<byte> body = new List<byte>();
        List<byte> header = new List<byte>();
        return source.Subscribe(b =>
        {
            if (b == 0x81 && !nextIsEscaped && !readingHeader)
            {
                // start
                readingHeader = true;
                readingBody = false;
                nextIsEscaped = false;
            }
            else if (b == 0x82 && !nextIsEscaped && !readingHeader)
            {
                // end
                readingHeader = false;
                readingBody = false;
                if (header.Count > 0 || body.Count > 0)
                {
                    o.OnNext(new Message()
                    {
                        Header = header.ToArray(),
                        Data = body.ToArray()
                    });
                    header.Clear();
                    body.Clear();
                }
                nextIsEscaped = false;
            }
            else if (b == 0x1B && !nextIsEscaped && !readingHeader)
            {
                nextIsEscaped = true;
            }
            else
            {
                if (readingHeader)
                {
                    header.Add(b);
                    if (header.Count == 3)
                    {
                        readingHeader = false;
                        readingBody = true;
                    }
                }
                else if (readingBody)
                    body.Add(b);
                nextIsEscaped = false;
            }

        });
    });

}