如何在自定义 ReadOnlySequence 实现的末尾 trim 归零?

How do I trim zeros at the end of a custom ReadOnlySequence implementation?

Web 套接字服务器需要 JSON 个字符串。 _webSocket.SendAsync 发送 ReadOnlyMemory<byte>。问题是 sequence.First 有尾随零。由于尾随零,它会导致无效的 JSON 消息。问题是我如何trim他们?

用法

var request = new JsonRpcRequest<object>
{
    JsonRpc = "2.0",
    Id = 1,
    Method = "public/get_instruments",
    Params = @params
};

var message = JsonSerializer.Serialize(request);

using var buffer = MemoryPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(message));
Encoding.UTF8.GetEncoder().Convert(message, buffer.Memory.Span, true, out _, out _, out _);

var seq = new OwnedMemorySequence<byte>();
seq.Append(buffer);

var msg = new ChannelWebSocket.Message
{
    MessageType = WebSocketMessageType.Text,
    Payload = seq
};

await client.Output.WriteAsync(msg).ConfigureAwait(false);

代码

private async Task OutputLoopAsync(CancellationToken cancellationToken)
{
    await foreach (var message in _output.Reader.ReadAllAsync())
    {
        var sequence = message.Payload.ReadOnlySequence;
        if (sequence.IsEmpty)
            continue;

        while (!sequence.IsSingleSegment)
        {
            await _webSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken);
            sequence = sequence.Slice(sequence.First.Length);
        }

        await _webSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken);
        message.Payload.Dispose();
    }

    await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}

public sealed class Message
{
    public WebSocketMessageType MessageType { get; set; }
    public OwnedMemorySequence<byte> Payload { get; set; } = null!;
}

public sealed class OwnedMemorySequence<T> : IDisposable
{
    private readonly CollectionDisposable _disposable = new();
    private readonly MemorySequence<T> _sequence = new();

    public ReadOnlySequence<T> ReadOnlySequence => _sequence.ReadOnlySequence;

    public OwnedMemorySequence<T> Append(IMemoryOwner<T> memoryOwner)
    {
        _disposable.Add(memoryOwner);
        _sequence.Append(memoryOwner.Memory);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _sequence.CreateReadOnlySequence(firstBufferStartIndex, lastBufferEndIndex);
    }

    public void Dispose()
    {
        _disposable.Dispose();
    }
}

public static class MemoryOwnerSliceExtensions
{
    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start, int length)
    {
        if (start == 0 && length == owner.Memory.Length)
            return owner;
        return new SliceOwner<T>(owner, start, length);
    }

    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start)
    {
        if (start == 0)
            return owner;
        return new SliceOwner<T>(owner, start);
    }

    private sealed class SliceOwner<T> : IMemoryOwner<T>
    {
        private readonly IMemoryOwner<T> _owner;

        public SliceOwner(IMemoryOwner<T> owner, int start, int length)
        {
            _owner = owner;
            Memory = _owner.Memory.Slice(start, length);
        }

        public SliceOwner(IMemoryOwner<T> owner, int start)
        {
            _owner = owner;
            Memory = _owner.Memory[start..];
        }

        public Memory<T> Memory { get; }

        public void Dispose()
        {
            _owner.Dispose();
        }
    }
}

public sealed class MemorySequence<T>
{
    private MemorySegment? _head;
    private MemorySegment? _tail;

    public ReadOnlySequence<T> ReadOnlySequence => CreateReadOnlySequence(0, _tail?.Memory.Length ?? 0);

    public MemorySequence<T> Append(ReadOnlyMemory<T> buffer)
    {
        if (_tail == null)
            _head = _tail = new MemorySegment(buffer, 0);
        else
            _tail = _tail.Append(buffer);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _tail == null ? new ReadOnlySequence<T>(Array.Empty<T>()) : new ReadOnlySequence<T>(_head!, firstBufferStartIndex, _tail, lastBufferEndIndex);
    }

    private sealed class MemorySegment : ReadOnlySequenceSegment<T>
    {
        public MemorySegment(ReadOnlyMemory<T> memory, long runningIndex)
        {
            Memory = memory;
            RunningIndex = runningIndex;
        }

        public MemorySegment Append(ReadOnlyMemory<T> nextMemory)
        {
            var next = new MemorySegment(nextMemory, RunningIndex + Memory.Length);
            Next = next;
            return next;
        }
    }
}

您不需要 trim。你只需要相应地分割你的记忆。

注意 MemoryPool<T>.Rent (quote from the doc) 返回“[...] 一个能够容纳 至少 minBufferSize elements of T.”这里重要的一点是“至少”,意思是返回的内存是允许的大于请求的大小。

你需要做的就是从内存中创建一个请求大小的片段,如果它恰好大于请求的大小,然后再将它添加到 OwnedMemorySequence_sequence 成员 .