如何在自定义 ReadOnlySequence 实现的末尾 trim 归零?
How do I trim zeros at the end of a custom ReadOnlySequence implementation?
Web 套接字服务器需要 JSON 个字符串。 _webSocket.SendAsync
发送 ReadOnlyMemory<byte>
。问题是 sequence.First
有尾随零。由于尾随零,它会导致无效的 JSON 消息。问题是我如何trim他们?
用法
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = 1,
Method = "public/get_instruments",
Params = @params
};
var message = JsonSerializer.Serialize(request);
using var buffer = MemoryPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(message));
Encoding.UTF8.GetEncoder().Convert(message, buffer.Memory.Span, true, out _, out _, out _);
var seq = new OwnedMemorySequence<byte>();
seq.Append(buffer);
var msg = new ChannelWebSocket.Message
{
MessageType = WebSocketMessageType.Text,
Payload = seq
};
await client.Output.WriteAsync(msg).ConfigureAwait(false);
代码
private async Task OutputLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync())
{
var sequence = message.Payload.ReadOnlySequence;
if (sequence.IsEmpty)
continue;
while (!sequence.IsSingleSegment)
{
await _webSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken);
sequence = sequence.Slice(sequence.First.Length);
}
await _webSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken);
message.Payload.Dispose();
}
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}
public sealed class Message
{
public WebSocketMessageType MessageType { get; set; }
public OwnedMemorySequence<byte> Payload { get; set; } = null!;
}
public sealed class OwnedMemorySequence<T> : IDisposable
{
private readonly CollectionDisposable _disposable = new();
private readonly MemorySequence<T> _sequence = new();
public ReadOnlySequence<T> ReadOnlySequence => _sequence.ReadOnlySequence;
public OwnedMemorySequence<T> Append(IMemoryOwner<T> memoryOwner)
{
_disposable.Add(memoryOwner);
_sequence.Append(memoryOwner.Memory);
return this;
}
public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
{
return _sequence.CreateReadOnlySequence(firstBufferStartIndex, lastBufferEndIndex);
}
public void Dispose()
{
_disposable.Dispose();
}
}
public static class MemoryOwnerSliceExtensions
{
public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start, int length)
{
if (start == 0 && length == owner.Memory.Length)
return owner;
return new SliceOwner<T>(owner, start, length);
}
public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start)
{
if (start == 0)
return owner;
return new SliceOwner<T>(owner, start);
}
private sealed class SliceOwner<T> : IMemoryOwner<T>
{
private readonly IMemoryOwner<T> _owner;
public SliceOwner(IMemoryOwner<T> owner, int start, int length)
{
_owner = owner;
Memory = _owner.Memory.Slice(start, length);
}
public SliceOwner(IMemoryOwner<T> owner, int start)
{
_owner = owner;
Memory = _owner.Memory[start..];
}
public Memory<T> Memory { get; }
public void Dispose()
{
_owner.Dispose();
}
}
}
public sealed class MemorySequence<T>
{
private MemorySegment? _head;
private MemorySegment? _tail;
public ReadOnlySequence<T> ReadOnlySequence => CreateReadOnlySequence(0, _tail?.Memory.Length ?? 0);
public MemorySequence<T> Append(ReadOnlyMemory<T> buffer)
{
if (_tail == null)
_head = _tail = new MemorySegment(buffer, 0);
else
_tail = _tail.Append(buffer);
return this;
}
public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
{
return _tail == null ? new ReadOnlySequence<T>(Array.Empty<T>()) : new ReadOnlySequence<T>(_head!, firstBufferStartIndex, _tail, lastBufferEndIndex);
}
private sealed class MemorySegment : ReadOnlySequenceSegment<T>
{
public MemorySegment(ReadOnlyMemory<T> memory, long runningIndex)
{
Memory = memory;
RunningIndex = runningIndex;
}
public MemorySegment Append(ReadOnlyMemory<T> nextMemory)
{
var next = new MemorySegment(nextMemory, RunningIndex + Memory.Length);
Next = next;
return next;
}
}
}
您不需要 trim。你只需要相应地分割你的记忆。
注意 MemoryPool<T>.Rent
(quote from the doc) 返回“[...] 一个能够容纳 至少 minBufferSize elements of T.”这里重要的一点是“至少”,意思是返回的内存是允许的大于请求的大小。
你需要做的就是从内存中创建一个请求大小的片段,如果它恰好大于请求的大小,然后再将它添加到 OwnedMemorySequence_sequence 成员 .
Web 套接字服务器需要 JSON 个字符串。 _webSocket.SendAsync
发送 ReadOnlyMemory<byte>
。问题是 sequence.First
有尾随零。由于尾随零,它会导致无效的 JSON 消息。问题是我如何trim他们?
用法
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = 1,
Method = "public/get_instruments",
Params = @params
};
var message = JsonSerializer.Serialize(request);
using var buffer = MemoryPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(message));
Encoding.UTF8.GetEncoder().Convert(message, buffer.Memory.Span, true, out _, out _, out _);
var seq = new OwnedMemorySequence<byte>();
seq.Append(buffer);
var msg = new ChannelWebSocket.Message
{
MessageType = WebSocketMessageType.Text,
Payload = seq
};
await client.Output.WriteAsync(msg).ConfigureAwait(false);
代码
private async Task OutputLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync())
{
var sequence = message.Payload.ReadOnlySequence;
if (sequence.IsEmpty)
continue;
while (!sequence.IsSingleSegment)
{
await _webSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken);
sequence = sequence.Slice(sequence.First.Length);
}
await _webSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken);
message.Payload.Dispose();
}
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}
public sealed class Message
{
public WebSocketMessageType MessageType { get; set; }
public OwnedMemorySequence<byte> Payload { get; set; } = null!;
}
public sealed class OwnedMemorySequence<T> : IDisposable
{
private readonly CollectionDisposable _disposable = new();
private readonly MemorySequence<T> _sequence = new();
public ReadOnlySequence<T> ReadOnlySequence => _sequence.ReadOnlySequence;
public OwnedMemorySequence<T> Append(IMemoryOwner<T> memoryOwner)
{
_disposable.Add(memoryOwner);
_sequence.Append(memoryOwner.Memory);
return this;
}
public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
{
return _sequence.CreateReadOnlySequence(firstBufferStartIndex, lastBufferEndIndex);
}
public void Dispose()
{
_disposable.Dispose();
}
}
public static class MemoryOwnerSliceExtensions
{
public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start, int length)
{
if (start == 0 && length == owner.Memory.Length)
return owner;
return new SliceOwner<T>(owner, start, length);
}
public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start)
{
if (start == 0)
return owner;
return new SliceOwner<T>(owner, start);
}
private sealed class SliceOwner<T> : IMemoryOwner<T>
{
private readonly IMemoryOwner<T> _owner;
public SliceOwner(IMemoryOwner<T> owner, int start, int length)
{
_owner = owner;
Memory = _owner.Memory.Slice(start, length);
}
public SliceOwner(IMemoryOwner<T> owner, int start)
{
_owner = owner;
Memory = _owner.Memory[start..];
}
public Memory<T> Memory { get; }
public void Dispose()
{
_owner.Dispose();
}
}
}
public sealed class MemorySequence<T>
{
private MemorySegment? _head;
private MemorySegment? _tail;
public ReadOnlySequence<T> ReadOnlySequence => CreateReadOnlySequence(0, _tail?.Memory.Length ?? 0);
public MemorySequence<T> Append(ReadOnlyMemory<T> buffer)
{
if (_tail == null)
_head = _tail = new MemorySegment(buffer, 0);
else
_tail = _tail.Append(buffer);
return this;
}
public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
{
return _tail == null ? new ReadOnlySequence<T>(Array.Empty<T>()) : new ReadOnlySequence<T>(_head!, firstBufferStartIndex, _tail, lastBufferEndIndex);
}
private sealed class MemorySegment : ReadOnlySequenceSegment<T>
{
public MemorySegment(ReadOnlyMemory<T> memory, long runningIndex)
{
Memory = memory;
RunningIndex = runningIndex;
}
public MemorySegment Append(ReadOnlyMemory<T> nextMemory)
{
var next = new MemorySegment(nextMemory, RunningIndex + Memory.Length);
Next = next;
return next;
}
}
}
您不需要 trim。你只需要相应地分割你的记忆。
注意 MemoryPool<T>.Rent
(quote from the doc) 返回“[...] 一个能够容纳 至少 minBufferSize elements of T.”这里重要的一点是“至少”,意思是返回的内存是允许的大于请求的大小。
你需要做的就是从内存中创建一个请求大小的片段,如果它恰好大于请求的大小,然后再将它添加到 OwnedMemorySequence