< Summary - Combined Code Coverage

Information
Class: NLightning.Application.Node.Managers.PeerManager
Assembly: NLightning.Application
File(s): /home/runner/work/NLightning/NLightning/src/NLightning.Application/Node/Managers/PeerManager.cs
Tag: 57_24045730253
Line coverage
0%
Covered lines: 0
Uncovered lines: 195
Coverable lines: 195
Total lines: 385
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 68
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
StartAsync()0%110100%
StopAsync()0%7280%
ConnectToPeerAsync()100%210%
DisconnectPeer(...)0%2040%
ListPeers()100%210%
GetPeer(...)100%210%
DisconnectPeer(...)100%210%
ConnectToPeerAsync()0%7280%
HandleNewPeerConnected(...)0%7280%
HandlePeerDisconnection(...)0%620%
HandlePeerChannelMessage(...)0%2040%
HandleChannelMessageResponseAsync()0%420200%
HandleResponseMessageReady(...)0%2040%

File(s)

/home/runner/work/NLightning/NLightning/src/NLightning.Application/Node/Managers/PeerManager.cs

#LineLine coverage
 1using System.Net.Sockets;
 2using Microsoft.Extensions.DependencyInjection;
 3using Microsoft.Extensions.Logging;
 4
 5namespace NLightning.Application.Node.Managers;
 6
 7using Domain.Channels.Enums;
 8using Domain.Channels.Events;
 9using Domain.Channels.Interfaces;
 10using Domain.Crypto.ValueObjects;
 11using Domain.Exceptions;
 12using Domain.Node.Constants;
 13using Domain.Node.Events;
 14using Domain.Node.Interfaces;
 15using Domain.Node.Models;
 16using Domain.Node.ValueObjects;
 17using Domain.Persistence.Interfaces;
 18using Domain.Protocol.Constants;
 19using Domain.Protocol.Interfaces;
 20using Infrastructure.Protocol.Models;
 21using Infrastructure.Transport.Events;
 22using Infrastructure.Transport.Interfaces;
 23
 24/// <summary>
 25/// Service for managing peers.
 26/// </summary>
 27/// <remarks>
 28/// This class is used to manage peers in the network.
 29/// </remarks>
 30/// <seealso cref="IPeerManager" />
 31public sealed class PeerManager : IPeerManager
 32{
 33    private readonly IChannelManager _channelManager;
 34    private readonly ILogger<PeerManager> _logger;
 35    private readonly IPeerServiceFactory _peerServiceFactory;
 36    private readonly ITcpService _tcpService;
 37    private readonly IServiceProvider _serviceProvider;
 038    private readonly Dictionary<CompactPubKey, PeerModel> _peers = [];
 39
 40    private CancellationTokenSource? _cts;
 41
 042    public PeerManager(IChannelManager channelManager, ILogger<PeerManager> logger,
 043                       IPeerServiceFactory peerServiceFactory, ITcpService tcpService, IServiceProvider serviceProvider)
 44    {
 045        _channelManager = channelManager;
 046        _logger = logger;
 047        _peerServiceFactory = peerServiceFactory;
 048        _tcpService = tcpService;
 049        _serviceProvider = serviceProvider;
 050    }
 51
 52    public async Task StartAsync(CancellationToken cancellationToken)
 53    {
 054        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 55
 056        _tcpService.OnNewPeerConnected += HandleNewPeerConnected;
 57
 058        _channelManager.OnResponseMessageReady += HandleResponseMessageReady;
 59
 60        // Load peers and initialize the channel manager
 061        using var scope = _serviceProvider.CreateScope();
 062        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 063        var peers = await uow.GetPeersForStartupAsync();
 064        foreach (var peer in peers)
 65        {
 66            try
 67            {
 068                _ = await ConnectToPeerAsync(peer.PeerAddressInfo, uow);
 069                if (!_peers.TryGetValue(peer.NodeId, out _))
 70                {
 071                    _logger.LogWarning("Unable to connect to peer {PeerId} on startup", peer.NodeId);
 72                    // TODO: Handle this case, maybe retry or log more details
 073                    continue;
 74                }
 75
 76                // Register channels with peer
 077                if (peer.Channels is not { Count: > 0 })
 078                    continue;
 79
 80                // Only register channels that are not closed or stale
 081                foreach (var channel in peer.Channels.Where(c => c.State != ChannelState.Closed))
 82                    // We don't care about the result here, as we just want to register the existing channels
 083                    _ = _channelManager.RegisterExistingChannelAsync(channel);
 084            }
 085            catch (ConnectionException)
 86            {
 087                _logger.LogWarning("Unable to connect to peer {PeerId} on startup", peer.NodeId);
 088            }
 089            catch (Exception e)
 90            {
 091                _logger.LogError(e, "Error connecting to peer {PeerId} on startup", peer.NodeId);
 092            }
 093        }
 94
 095        await uow.SaveChangesAsync();
 96
 097        await _tcpService.StartListeningAsync(_cts.Token);
 098    }
 99
 100    public async Task StopAsync()
 101    {
 0102        if (_cts is null)
 0103            throw new InvalidOperationException($"{nameof(PeerManager)} is not running");
 104
 0105        foreach (var peerKey in _peers.Keys)
 106            try
 107            {
 0108                DisconnectPeer(peerKey);
 0109            }
 0110            catch (Exception e)
 111            {
 0112                _logger.LogWarning(e, "Error disconnecting peer {Peer}", peerKey);
 0113            }
 114
 115        try
 116        {
 117            // Give it a 5-second timeout to disconnect all peers
 0118            var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
 0119            while (_peers.Count > 0 && !_cts.IsCancellationRequested)
 0120                await Task.Delay(TimeSpan.FromSeconds(1), timeoutTokenSource.Token);
 0121        }
 0122        catch (TaskCanceledException)
 123        {
 0124            _logger.LogWarning("Timeout while waiting for peers to disconnect");
 0125        }
 126
 0127        await _cts.CancelAsync();
 0128    }
 129
 130    /// <inheritdoc />
 131    /// <exception cref="ConnectionException">Thrown when the connection to the peer fails.</exception>
 132    /// <exception cref="InvalidOperationException">Thrown when the connection to the peer already exists.</exception>
 133    public async Task<PeerModel> ConnectToPeerAsync(PeerAddressInfo peerAddressInfo)
 134    {
 0135        using var scope = _serviceProvider.CreateScope();
 0136        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 137
 0138        var peer = await ConnectToPeerAsync(peerAddressInfo, uow);
 139
 0140        await uow.SaveChangesAsync();
 141
 0142        return peer;
 0143    }
 144
 145    /// <inheritdoc />
 146    public void DisconnectPeer(CompactPubKey pubKey, Exception? exception = null)
 147    {
 0148        if (_peers.TryGetValue(pubKey, out var peer))
 149        {
 0150            if (!peer.TryGetPeerService(out var peerService))
 151            {
 0152                _logger.LogWarning("PeerService not found for {Peer}", pubKey);
 0153                return;
 154            }
 155
 0156            DisconnectPeer(peerService, exception);
 157        }
 158        else
 159        {
 0160            _logger.LogWarning("Peer {Peer} not found", pubKey);
 161        }
 0162    }
 163
 164    public List<PeerModel> ListPeers()
 165    {
 0166        return _peers.Values.ToList();
 167    }
 168
 169    public PeerModel? GetPeer(CompactPubKey peerId)
 170    {
 0171        return _peers.GetValueOrDefault(peerId);
 172    }
 173
 174    private static void DisconnectPeer(IPeerService peerService, Exception? exception = null)
 175    {
 0176        peerService.Disconnect(exception);
 0177    }
 178
 179    private async Task<PeerModel> ConnectToPeerAsync(PeerAddressInfo peerAddressInfo, IUnitOfWork uow)
 180    {
 181        // Convert and validate the address
 0182        var peerAddress = new PeerAddress(peerAddressInfo.Address);
 183
 184        // Check if we're already connected to the peer
 0185        if (_peers.ContainsKey(peerAddress.PubKey))
 186        {
 0187            throw new InvalidOperationException($"Already connected to peer {peerAddress.PubKey}");
 188        }
 189
 190        // Connect to the peer
 0191        var connectedPeer = await _tcpService.ConnectToPeerAsync(peerAddress);
 192
 0193        var peerService = await _peerServiceFactory.CreateConnectedPeerAsync(connectedPeer.CompactPubKey,
 0194                                                                             connectedPeer.TcpClient);
 0195        peerService.OnDisconnect += HandlePeerDisconnection;
 0196        peerService.OnChannelMessageReceived += HandlePeerChannelMessage;
 197
 0198        var preferredHost = connectedPeer.Host;
 0199        var preferredPort = connectedPeer.Port;
 200
 201        // Check if the node has set it's preferred address
 0202        if (peerService.PreferredHost is not null)
 0203            preferredHost = peerService.PreferredHost;
 204
 0205        if (peerService.PreferredPort is not null)
 0206            preferredPort = peerService.PreferredPort.Value;
 207
 0208        var peer = new PeerModel(connectedPeer.CompactPubKey, preferredHost, preferredPort,
 0209                                 connectedPeer.TcpClient.Client.ProtocolType == ProtocolType.IPv6 ? "IPv6" : "IPv4")
 0210        {
 0211            LastSeenAt = DateTime.UtcNow
 0212        };
 0213        peer.SetPeerService(peerService);
 214
 0215        _peers.Add(connectedPeer.CompactPubKey, peer);
 216
 0217        await uow.PeerDbRepository.AddOrUpdateAsync(peer);
 218
 0219        return peer;
 0220    }
 221
 222    private void HandleNewPeerConnected(object? _, NewPeerConnectedEventArgs args)
 223    {
 224        try
 225        {
 226            // Create the peer
 0227            var peerService = _peerServiceFactory.CreateConnectingPeerAsync(args.TcpClient).GetAwaiter().GetResult();
 0228            peerService.OnDisconnect += HandlePeerDisconnection;
 0229            peerService.OnChannelMessageReceived += HandlePeerChannelMessage;
 230
 0231            _logger.LogTrace("PeerService created for peer {PeerPubKey}", peerService.PeerPubKey);
 232
 0233            var preferredHost = args.Host;
 0234            var preferredPort = NodeConstants.DefaultPort;
 235
 236            // Check if the node has set it's preferred address
 0237            if (peerService.PreferredHost is not null)
 0238                preferredHost = peerService.PreferredHost;
 239
 0240            if (peerService.PreferredPort is not null)
 0241                preferredPort = peerService.PreferredPort.Value;
 242
 0243            var peer = new PeerModel(peerService.PeerPubKey, preferredHost, preferredPort,
 0244                                     args.TcpClient.Client.ProtocolType == ProtocolType.IPv6 ? "IPv6" : "IPv4")
 0245            {
 0246                LastSeenAt = DateTime.UtcNow
 0247            };
 0248            peer.SetPeerService(peerService);
 249
 0250            if (preferredHost != "127.0.0.1")
 251            {
 252                // Get a context to save the peer to the database
 0253                using var scope = _serviceProvider.CreateScope();
 0254                using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 255
 0256                uow.PeerDbRepository.AddOrUpdateAsync(peer);
 0257                uow.SaveChanges();
 258            }
 259
 0260            _peers.Add(peerService.PeerPubKey, peer);
 0261        }
 0262        catch (Exception e)
 263        {
 0264            _logger.LogError(e, "Error handling new peer connection from {Host}:{Port}", args.Host, args.Port);
 0265        }
 0266    }
 267
 268    private void HandlePeerDisconnection(object? sender, PeerDisconnectedEventArgs args)
 269    {
 0270        ArgumentNullException.ThrowIfNull(args);
 271
 0272        _peers.Remove(args.PeerPubKey);
 0273        _logger.LogInformation("Peer {Peer} disconnected", args.PeerPubKey);
 274
 0275        if (sender is IPeerService peerService)
 276        {
 0277            peerService.OnDisconnect -= HandlePeerDisconnection;
 0278            peerService.OnChannelMessageReceived -= HandlePeerChannelMessage;
 0279            peerService.Dispose();
 280        }
 281        else
 282        {
 0283            _logger.LogWarning("Peer {Peer} disconnected, but we were unable to detach event handlers",
 0284                               args.PeerPubKey);
 285        }
 0286    }
 287
 288    private void HandlePeerChannelMessage(object? _, ChannelMessageEventArgs args)
 289    {
 0290        ArgumentNullException.ThrowIfNull(args);
 291
 0292        if (!_peers.TryGetValue(args.PeerPubKey, out var peer))
 0293            throw new ConnectionException($"Peer {args.PeerPubKey} not found while handling channel message");
 294
 0295        if (!peer.TryGetPeerService(out var peerService))
 0296            throw new ConnectionException(
 0297                $"PeerService not found for peer {args.PeerPubKey} while handling channel message");
 298
 0299        _channelManager.HandleChannelMessageAsync(args.Message, peerService.Features, peerService.PeerPubKey)
 0300                       .ContinueWith(task => HandleChannelMessageResponseAsync(task, peerService.PeerPubKey,
 0301                                                                               args.Message.Type));
 0302    }
 303
 304    private async Task HandleChannelMessageResponseAsync(Task<IChannelMessage?> task, CompactPubKey peerPubKey,
 305                                                         MessageTypes messageType)
 306    {
 0307        if (!_peers.TryGetValue(peerPubKey, out var peer))
 0308            throw new ConnectionException($"Peer {peerPubKey} not found while handling channel response message");
 309
 0310        if (!peer.TryGetPeerService(out var peerService))
 0311            throw new ConnectionException(
 0312                $"PeerService not found for peer {peerPubKey} while handling channel response message");
 313
 0314        if (task.IsFaulted)
 315        {
 0316            if (task.Exception is { InnerException: ChannelErrorException cee })
 317            {
 0318                _logger.LogError(
 0319                    "Error handling channel message ({messageType}) from peer {peer}: {message}",
 0320                    Enum.GetName(messageType), peerService.PeerPubKey,
 0321                    !string.IsNullOrEmpty(cee.PeerMessage)
 0322                        ? cee.PeerMessage
 0323                        : cee.Message);
 324
 0325                DisconnectPeer(peerService, cee);
 0326                return;
 327            }
 328
 0329            if (task.Exception is { InnerException: ChannelWarningException cwe })
 330            {
 0331                _logger.LogWarning(
 0332                    "Error handling channel message ({messageType}) from peer {peer}: {message}",
 0333                    Enum.GetName(messageType), peerService.PeerPubKey,
 0334                    !string.IsNullOrEmpty(cwe.PeerMessage)
 0335                        ? cwe.PeerMessage
 0336                        : cwe.Message);
 337
 0338                _ = peerService.SendWarningAsync(cwe)
 0339                               .ContinueWith(warningTask =>
 0340                                {
 0341                                    if (warningTask.IsFaulted)
 0342                                    {
 0343                                        _logger.LogError(warningTask.Exception,
 0344                                                         "Failed to send warning message to peer {Peer}",
 0345                                                         peerService.PeerPubKey);
 0346                                    }
 0347                                }, TaskContinuationOptions.OnlyOnFaulted);
 348
 0349                return;
 350            }
 351
 0352            _logger.LogError(
 0353                task.Exception, "Error handling channel message ({messageType}) from peer {peer}",
 0354                Enum.GetName(messageType), peerService.PeerPubKey);
 355
 0356            DisconnectPeer(peerService);
 0357            return;
 358        }
 359
 0360        var replyMessage = task.Result;
 0361        if (replyMessage is not null)
 0362            await peerService.SendMessageAsync(replyMessage);
 0363    }
 364
 365    private void HandleResponseMessageReady(object? sender, ChannelResponseMessageEventArgs args)
 366    {
 0367        ArgumentNullException.ThrowIfNull(args);
 368
 369        // Find PeerService by CompactPubKey
 0370        if (!_peers.TryGetValue(args.PeerPubKey, out var peer))
 0371            throw new ConnectionException($"Peer {args.PeerPubKey} not found while handling response message");
 372
 0373        if (!peer.TryGetPeerService(out var peerService))
 0374            throw new ConnectionException(
 0375                $"PeerService not found for peer {args.PeerPubKey} while handling response message");
 376
 377        // Send the response message to the peer
 0378        peerService.SendMessageAsync(args.ResponseMessage)
 0379                   .ContinueWith(task =>
 0380                    {
 0381                        _logger.LogError(task.Exception, "Failed to send response message to peer {Peer}",
 0382                                         args.PeerPubKey);
 0383                    }, TaskContinuationOptions.OnlyOnFaulted);
 0384    }
 385}