| | | 1 | | using System.Buffers; |
| | | 2 | | using MessagePack; |
| | | 3 | | |
| | | 4 | | namespace NLightning.Daemon.Services.Ipc; |
| | | 5 | | |
| | | 6 | | using Daemon.Ipc.Interfaces; |
| | | 7 | | using Transport.Ipc; |
| | | 8 | | |
| | | 9 | | /// <summary> |
| | | 10 | | /// Length-prefixed MessagePack framing for IpcEnvelope. |
| | | 11 | | /// </summary> |
| | | 12 | | public sealed class LengthPrefixedIpcFraming : IIpcFraming |
| | | 13 | | { |
| | | 14 | | public async Task<IpcEnvelope> ReadAsync(Stream stream, CancellationToken ct) |
| | | 15 | | { |
| | 0 | 16 | | var header = new byte[4]; |
| | 0 | 17 | | await ReadExactAsync(stream, header, ct); |
| | 0 | 18 | | var len = BitConverter.ToInt32(header, 0); |
| | 0 | 19 | | if (len is <= 0 or > 10_000_000) throw new IOException("Invalid IPC frame length."); |
| | | 20 | | |
| | 0 | 21 | | var buffer = ArrayPool<byte>.Shared.Rent(len); |
| | | 22 | | try |
| | | 23 | | { |
| | 0 | 24 | | await ReadExactAsync(stream, buffer.AsMemory(0, len), ct); |
| | 0 | 25 | | return MessagePackSerializer.Deserialize<IpcEnvelope>(buffer.AsMemory(0, len), cancellationToken: ct); |
| | | 26 | | } |
| | | 27 | | finally |
| | | 28 | | { |
| | 0 | 29 | | ArrayPool<byte>.Shared.Return(buffer); |
| | | 30 | | } |
| | 0 | 31 | | } |
| | | 32 | | |
| | | 33 | | public async Task WriteAsync(Stream stream, IpcEnvelope envelope, CancellationToken ct) |
| | | 34 | | { |
| | 0 | 35 | | var payload = MessagePackSerializer.Serialize(envelope, cancellationToken: ct); |
| | 0 | 36 | | var len = BitConverter.GetBytes(payload.Length); |
| | 0 | 37 | | await stream.WriteAsync(len, ct); |
| | 0 | 38 | | await stream.WriteAsync(payload, ct); |
| | 0 | 39 | | await stream.FlushAsync(ct); |
| | 0 | 40 | | } |
| | | 41 | | |
| | | 42 | | private static async Task ReadExactAsync(Stream stream, Memory<byte> buffer, CancellationToken ct) |
| | | 43 | | { |
| | 0 | 44 | | var total = 0; |
| | 0 | 45 | | while (total < buffer.Length) |
| | | 46 | | { |
| | 0 | 47 | | var read = await stream.ReadAsync(buffer[total..], ct); |
| | 0 | 48 | | if (read == 0) throw new EndOfStreamException(); |
| | 0 | 49 | | total += read; |
| | | 50 | | } |
| | 0 | 51 | | } |
| | | 52 | | } |