< Summary - Combined Code Coverage

Information
Class: NLightning.Infrastructure.Node.Services.PeerCommunicationService
Assembly: NLightning.Infrastructure
File(s): /home/runner/work/NLightning/NLightning/src/NLightning.Infrastructure/Node/Services/PeerCommunicationService.cs
Tag: 57_24045730253
Line coverage
0%
Covered lines: 0
Uncovered lines: 143
Coverable lines: 143
Total lines: 321
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 60
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/NLightning/NLightning/src/NLightning.Infrastructure/Node/Services/PeerCommunicationService.cs

#LineLine coverage
 1using Microsoft.Extensions.DependencyInjection;
 2using Microsoft.Extensions.Logging;
 3
 4namespace NLightning.Infrastructure.Node.Services;
 5
 6using Domain.Channels.ValueObjects;
 7using Domain.Crypto.ValueObjects;
 8using Domain.Exceptions;
 9using Domain.Node.Interfaces;
 10using Domain.Persistence.Interfaces;
 11using Domain.Protocol.Constants;
 12using Domain.Protocol.Interfaces;
 13using Domain.Protocol.Messages;
 14using Domain.Protocol.Payloads;
 15
 16/// <summary>
 17/// Service for communication with a single peer.
 18/// </summary>
 19public class PeerCommunicationService : IPeerCommunicationService
 20{
 021    private readonly CancellationTokenSource _cts = new();
 22    private readonly ILogger<PeerCommunicationService> _logger;
 23    private readonly IMessageService _messageService;
 24    private readonly IPingPongService _pingPongService;
 25    private readonly IServiceProvider _serviceProvider;
 26    private readonly IMessageFactory _messageFactory;
 027    private readonly TaskCompletionSource<bool> _pingPongTcs = new();
 28
 29    private bool _isInitialized;
 30    private CancellationTokenSource? _initWaitCancellationTokenSource;
 31
 32    /// <inheritdoc />
 33    public event EventHandler<IMessage?>? MessageReceived;
 34
 35    /// <inheritdoc />
 36    public event EventHandler<Exception?>? DisconnectEvent;
 37
 38    /// <inheritdoc />
 39    public event EventHandler<Exception>? ExceptionRaised;
 40
 41    /// <inheritdoc />
 042    public bool IsConnected => _messageService.IsConnected;
 43
 44    /// <inheritdoc />
 045    public CompactPubKey PeerCompactPubKey { get; }
 46
 47    /// <summary>
 48    /// Initializes a new instance of the <see cref="PeerCommunicationService"/> class.
 49    /// </summary>
 50    /// <param name="logger">The logger.</param>
 51    /// <param name="messageService">The message service.</param>
 52    /// <param name="messageFactory">The message factory.</param>
 53    /// <param name="peerCompactPubKey">The peer's public key.</param>
 54    /// <param name="pingPongService">The ping pong service.</param>
 55    /// <param name="serviceProvider">The service provider.</param>
 056    public PeerCommunicationService(ILogger<PeerCommunicationService> logger, IMessageService messageService,
 057                                    IMessageFactory messageFactory, CompactPubKey peerCompactPubKey,
 058                                    IPingPongService pingPongService, IServiceProvider serviceProvider)
 59    {
 060        _logger = logger;
 061        _messageService = messageService;
 062        _messageFactory = messageFactory;
 063        PeerCompactPubKey = peerCompactPubKey;
 064        _pingPongService = pingPongService;
 065        _serviceProvider = serviceProvider;
 66
 067        _messageService.OnMessageReceived += HandleMessageReceived;
 068        _messageService.OnExceptionRaised += HandleExceptionRaised;
 069        _pingPongService.DisconnectEvent += HandleExceptionRaised;
 070    }
 71
 72    /// <inheritdoc />
 73    public async Task InitializeAsync(TimeSpan networkTimeout)
 74    {
 075        _logger.LogTrace("Waiting for init message from peer {peer}", PeerCompactPubKey);
 76
 77        // Set timeout to close connection if the other peer doesn't send an init message
 078        _initWaitCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
 079        _ = Task.Delay(networkTimeout, _initWaitCancellationTokenSource.Token).ContinueWith(task =>
 080        {
 081            if (!task.IsCanceled && !_isInitialized)
 082            {
 083                RaiseException(
 084                    new ConnectionException($"Peer {PeerCompactPubKey} did not send an init message before timeout"));
 085            }
 086        });
 87
 88        // Always send an init message upon connection
 089        _logger.LogTrace("Sending init message to peer {peer}", PeerCompactPubKey);
 090        var initMessage = _messageFactory.CreateInitMessage();
 91        try
 92        {
 093            await _messageService.SendMessageAsync(initMessage, true, _cts.Token);
 094        }
 095        catch (Exception e)
 96        {
 097            _pingPongTcs.TrySetResult(true);
 098            throw new ConnectionException($"Failed to send init message to peer {PeerCompactPubKey}", e);
 99        }
 100
 101        // Set up ping service to keep connection alive
 0102        if (!_cts.IsCancellationRequested)
 103        {
 0104            if (!_messageService.IsConnected)
 0105                throw new ConnectionException($"Failed to connect to peer {PeerCompactPubKey}");
 106
 0107            SetupPingPongService();
 108        }
 0109    }
 110
 111    /// <inheritdoc />
 112    public async Task SendMessageAsync(IMessage message, CancellationToken cancellationToken = default)
 113    {
 114        try
 115        {
 0116            await _messageService.SendMessageAsync(message, cancellationToken: cancellationToken);
 0117        }
 0118        catch (Exception ex)
 119        {
 0120            RaiseException(new ConnectionException($"Failed to send message to peer {PeerCompactPubKey}", ex));
 0121        }
 0122    }
 123
 124    /// <inheritdoc/>
 125    public async Task SendWarningAsync(WarningException we, CancellationToken cancellationToken = default)
 126    {
 127        try
 128        {
 0129            var message = we.Message;
 0130            ChannelId? channelId = null;
 0131            if (we is ChannelWarningException cwe)
 132            {
 0133                message = cwe.PeerMessage ?? we.Message;
 0134                channelId = cwe.ChannelId;
 135            }
 136
 0137            var warningMessage = _messageFactory.CreateWarningMessage(message, channelId);
 0138            await _messageService.SendMessageAsync(warningMessage, cancellationToken: cancellationToken);
 0139        }
 0140        catch (Exception ex)
 141        {
 0142            RaiseException(new ConnectionException($"Failed to send message to peer {PeerCompactPubKey}", ex));
 0143        }
 0144    }
 145
 146    /// <inheritdoc />
 147    public void Disconnect(Exception? exception = null)
 148    {
 149        try
 150        {
 0151            SendExceptionMessage(exception).GetAwaiter().GetResult();
 152
 0153            _ = _cts.CancelAsync();
 0154            _logger.LogTrace("Waiting for ping service to stop for peer {peer}", PeerCompactPubKey);
 0155            _pingPongTcs.Task.Wait(TimeSpan.FromSeconds(5));
 0156            _logger.LogTrace("Ping service stopped for peer {peer}", PeerCompactPubKey);
 0157        }
 158        finally
 159        {
 0160            DisconnectEvent?.Invoke(this, exception);
 0161        }
 0162    }
 163
 164    private void SetupPingPongService()
 165    {
 0166        _pingPongService.OnPingMessageReady += HandlePingMessageReady;
 0167        _pingPongService.OnPongReceived += HandlePongReceived;
 168
 169        // Setup Ping to keep connection alive
 0170        _ = _pingPongService.StartPingAsync(_cts.Token).ContinueWith(_ =>
 0171        {
 0172            _logger.LogTrace("Ping service stopped for peer {peer}, setting result", PeerCompactPubKey);
 0173            _pingPongTcs.TrySetResult(true);
 0174        });
 175
 0176        _logger.LogInformation("Ping service started for peer {peer}", PeerCompactPubKey);
 0177    }
 178
 179    private void HandlePongReceived(object? sender, EventArgs e)
 180    {
 0181        using var scope = _serviceProvider.CreateScope();
 0182        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 183
 0184        uow.PeerDbRepository.UpdatePeerLastSeenAsync(PeerCompactPubKey).GetAwaiter().GetResult();
 0185        uow.SaveChanges();
 0186    }
 187
 188    private void HandlePingMessageReady(object? sender, IMessage pingMessage)
 189    {
 190        // We can only send ping messages if the peer is initialized
 0191        if (!_isInitialized)
 0192            return;
 193
 194        try
 195        {
 0196            _messageService.SendMessageAsync(pingMessage, cancellationToken: _cts.Token).GetAwaiter().GetResult();
 0197        }
 0198        catch (Exception ex)
 199        {
 0200            RaiseException(new ConnectionException($"Failed to send ping message to peer {PeerCompactPubKey}", ex));
 0201        }
 0202    }
 203
 204    private void HandleMessageReceived(object? sender, IMessage? message)
 205    {
 0206        if (message is null)
 207        {
 0208            return;
 209        }
 210
 0211        if (!_isInitialized && message.Type == MessageTypes.Init)
 212        {
 0213            _isInitialized = true;
 0214            _initWaitCancellationTokenSource?.Cancel();
 215        }
 216
 217        // Forward the message to subscribers
 0218        MessageReceived?.Invoke(this, message);
 219
 220        // Handle ping messages internally
 0221        if (_isInitialized && message.Type == MessageTypes.Ping)
 222        {
 0223            _ = HandlePingAsync(message);
 224        }
 0225        else if (_isInitialized && message.Type == MessageTypes.Pong)
 226        {
 0227            _pingPongService.HandlePong(message);
 228        }
 0229    }
 230
 231    private async Task HandlePingAsync(IMessage pingMessage)
 232    {
 0233        var pongMessage = _messageFactory.CreatePongMessage(pingMessage);
 0234        await _messageService.SendMessageAsync(pongMessage);
 0235    }
 236
 237    private void HandleExceptionRaised(object? sender, Exception e)
 238    {
 0239        RaiseException(e);
 0240    }
 241
 242    private Task SendExceptionMessage(Exception? exception)
 243    {
 244        switch (exception)
 245        {
 246            case ConnectionException:
 247            case null:
 0248                return Task.CompletedTask;
 249            case ErrorException errorException:
 250                {
 0251                    ChannelId? channelId = null;
 0252                    var message = errorException.Message;
 253
 0254                    if (errorException is ChannelErrorException channelErrorException)
 255                    {
 0256                        channelId = channelErrorException.ChannelId;
 0257                        if (!string.IsNullOrWhiteSpace(channelErrorException.PeerMessage))
 0258                            message = channelErrorException.PeerMessage;
 259                    }
 260
 0261                    _logger.LogTrace("Sending error message to peer {peer}. ChannelId: {channelId}, Message: {message}",
 0262                                     PeerCompactPubKey, channelId, message);
 263
 0264                    return _messageService.SendMessageAsync(
 0265                        new ErrorMessage(new ErrorPayload(channelId, message)));
 266                }
 267            case WarningException warningException:
 268                {
 0269                    ChannelId? channelId = null;
 0270                    var message = warningException.Message;
 271
 0272                    if (warningException is ChannelWarningException channelWarningException)
 273                    {
 0274                        channelId = channelWarningException.ChannelId;
 0275                        if (!string.IsNullOrWhiteSpace(channelWarningException.PeerMessage))
 0276                            message = channelWarningException.PeerMessage;
 277                    }
 278
 0279                    _logger.LogTrace("Sending warning message to peer {peer}. ChannelId: {channelId}, Message: {message}
 0280                                     PeerCompactPubKey, channelId, message);
 281
 0282                    return _messageService.SendMessageAsync(
 0283                        new WarningMessage(new ErrorPayload(channelId, message)));
 284                }
 285            default:
 0286                return Task.CompletedTask;
 287        }
 288    }
 289
 290    private void RaiseException(Exception exception)
 291    {
 0292        var mustDisconnect = false;
 0293        if (exception is ErrorException)
 0294            mustDisconnect = true;
 295
 0296        _ = Task.Run(() => SendExceptionMessage(exception));
 297
 298        // Forward the exception to subscribers
 0299        ExceptionRaised?.Invoke(this, exception);
 300
 301        // Disconnect if not already disconnecting
 0302        if (mustDisconnect && !_cts.IsCancellationRequested)
 303        {
 0304            _logger.LogWarning(exception, "We're disconnecting peer {peer} because of an exception",
 0305                               PeerCompactPubKey);
 0306            Disconnect();
 307        }
 0308    }
 309
 310    public void Dispose()
 311    {
 312        // Unsubscribe from events
 0313        _messageService.OnMessageReceived -= HandleMessageReceived;
 0314        _messageService.OnExceptionRaised -= HandleExceptionRaised;
 0315        _pingPongService.DisconnectEvent -= HandleExceptionRaised;
 316
 0317        _cts.Dispose();
 0318        _messageService.Dispose();
 0319        _initWaitCancellationTokenSource?.Dispose();
 0320    }
 321}