| | | 1 | | using Microsoft.Extensions.Logging; |
| | | 2 | | |
| | | 3 | | namespace NLightning.Daemon.Handlers; |
| | | 4 | | |
| | | 5 | | using Domain.Bitcoin.Interfaces; |
| | | 6 | | using Domain.Channels.Enums; |
| | | 7 | | using Domain.Channels.Events; |
| | | 8 | | using Domain.Channels.Interfaces; |
| | | 9 | | using Domain.Channels.ValueObjects; |
| | | 10 | | using Domain.Client.Constants; |
| | | 11 | | using Domain.Client.Enums; |
| | | 12 | | using Domain.Client.Exceptions; |
| | | 13 | | using Domain.Client.Requests; |
| | | 14 | | using Domain.Client.Responses; |
| | | 15 | | using Domain.Crypto.ValueObjects; |
| | | 16 | | using Domain.Exceptions; |
| | | 17 | | using Domain.Node.Events; |
| | | 18 | | using Domain.Node.Interfaces; |
| | | 19 | | using Interfaces; |
| | | 20 | | |
| | | 21 | | public class OpenChannelClientSubscriptionHandler : |
| | | 22 | | IClientCommandHandler<OpenChannelClientSubscriptionRequest, OpenChannelClientSubscriptionResponse> |
| | | 23 | | { |
| | | 24 | | private readonly IChannelMemoryRepository _channelMemoryRepository; |
| | | 25 | | private readonly ILogger<OpenChannelClientSubscriptionHandler> _logger; |
| | | 26 | | private readonly IPeerManager _peerManager; |
| | | 27 | | private readonly IUtxoMemoryRepository _utxoMemoryRepository; |
| | | 28 | | |
| | | 29 | | private ChannelId _channelId; |
| | | 30 | | private IPeerService? _peerService; |
| | | 31 | | |
| | | 32 | | /// <inheritdoc/> |
| | 0 | 33 | | public ClientCommand Command => ClientCommand.OpenChannelSubscription; |
| | | 34 | | |
| | 0 | 35 | | public OpenChannelClientSubscriptionHandler(IChannelMemoryRepository channelMemoryRepository, |
| | 0 | 36 | | ILogger<OpenChannelClientSubscriptionHandler> logger, |
| | 0 | 37 | | IPeerManager peerManager, IUtxoMemoryRepository utxoMemoryRepository) |
| | | 38 | | { |
| | 0 | 39 | | _channelMemoryRepository = channelMemoryRepository; |
| | 0 | 40 | | _logger = logger; |
| | 0 | 41 | | _peerManager = peerManager; |
| | 0 | 42 | | _utxoMemoryRepository = utxoMemoryRepository; |
| | 0 | 43 | | } |
| | | 44 | | |
| | | 45 | | /// <inheritdoc/> |
| | | 46 | | public async Task<OpenChannelClientSubscriptionResponse> HandleAsync(OpenChannelClientSubscriptionRequest request, |
| | | 47 | | CancellationToken ct) |
| | | 48 | | { |
| | 0 | 49 | | if (request.ChannelId == ChannelId.Zero) |
| | 0 | 50 | | throw new ClientException(ErrorCodes.InvalidChannel, "ChannelId cannot be empty"); |
| | | 51 | | |
| | 0 | 52 | | _channelId = request.ChannelId; |
| | | 53 | | |
| | 0 | 54 | | if (!_channelMemoryRepository.TryGetChannel(_channelId, out var channel)) |
| | 0 | 55 | | throw new ClientException(ErrorCodes.InvalidChannel, $"Channel with Id {_channelId} not found"); |
| | | 56 | | |
| | 0 | 57 | | var peer = _peerManager.GetPeer(channel.RemoteNodeId) ?? throw new ClientException(ErrorCodes.InvalidOperation, |
| | 0 | 58 | | $"Peer with NodeId {channel.RemoteNodeId} is not connected"); |
| | | 59 | | |
| | | 60 | | // Create a task completion source for the response |
| | 0 | 61 | | var tsc = new TaskCompletionSource<OpenChannelClientSubscriptionResponse>( |
| | 0 | 62 | | TaskCreationOptions.RunContinuationsAsynchronously); |
| | | 63 | | |
| | | 64 | | // If it's in a state we consider Open, return immediately |
| | 0 | 65 | | if (channel.State is ChannelState.ReadyForUs or ChannelState.ReadyForThem or ChannelState.Open) |
| | | 66 | | { |
| | 0 | 67 | | return new OpenChannelClientSubscriptionResponse(channel.ChannelId) |
| | 0 | 68 | | { |
| | 0 | 69 | | ChannelState = ChannelState.ReadyForUs, |
| | 0 | 70 | | TxId = channel.FundingOutput?.TransactionId, |
| | 0 | 71 | | Index = channel.FundingOutput?.Index |
| | 0 | 72 | | }; |
| | | 73 | | } |
| | | 74 | | |
| | | 75 | | // Check if the channel is already in a state we care about |
| | 0 | 76 | | var lockedUtxos = _utxoMemoryRepository.GetLockedUtxosForChannel(_channelId); |
| | 0 | 77 | | if (channel.State is not ChannelState.V1FundingSigned && lockedUtxos.Count == 0) |
| | 0 | 78 | | throw new ClientException(ErrorCodes.InvalidOperation, $"No locked UTXOs found for channel {_channelId}"); |
| | | 79 | | |
| | | 80 | | try |
| | | 81 | | { |
| | 0 | 82 | | if (!peer.TryGetPeerService(out _peerService)) |
| | 0 | 83 | | throw new ClientException(ErrorCodes.InvalidOperation, "Error getting peerService from peer"); |
| | | 84 | | |
| | | 85 | | // Subscribe to the events |
| | 0 | 86 | | _peerService.OnAttentionMessageReceived += AttentionMessageHandlerEnvelope; |
| | 0 | 87 | | _peerService.OnDisconnect += PeerDisconnectionEnvelope; |
| | 0 | 88 | | _peerService.OnExceptionRaised += ExceptionRaisedEnvelope; |
| | 0 | 89 | | _channelMemoryRepository.OnChannelUpdated += ChannelUpdatedHandlerEnvelope; |
| | | 90 | | |
| | 0 | 91 | | return await tsc.Task; |
| | | 92 | | } |
| | 0 | 93 | | catch |
| | | 94 | | { |
| | 0 | 95 | | if (!_channelMemoryRepository.TryGetChannel(_channelId, out channel) |
| | 0 | 96 | | || channel.State is ChannelState.ReadyForUs |
| | 0 | 97 | | or ChannelState.ReadyForThem |
| | 0 | 98 | | or ChannelState.Open |
| | 0 | 99 | | or ChannelState.V1FundingSigned) |
| | 0 | 100 | | throw; |
| | | 101 | | |
| | 0 | 102 | | _utxoMemoryRepository.ReturnUtxosNotSpentOnChannel(request.ChannelId); |
| | | 103 | | |
| | 0 | 104 | | throw; |
| | | 105 | | } |
| | | 106 | | finally |
| | | 107 | | { |
| | | 108 | | //Unsubscribe from the events so we don't have dangling memory |
| | 0 | 109 | | _peerService?.OnAttentionMessageReceived -= AttentionMessageHandlerEnvelope; |
| | 0 | 110 | | _peerService?.OnDisconnect -= PeerDisconnectionEnvelope; |
| | 0 | 111 | | _peerService?.OnExceptionRaised -= ExceptionRaisedEnvelope; |
| | 0 | 112 | | _channelMemoryRepository.OnChannelUpdated -= ChannelUpdatedHandlerEnvelope; |
| | | 113 | | } |
| | | 114 | | |
| | | 115 | | // Envelopes for the events |
| | | 116 | | void AttentionMessageHandlerEnvelope(object? _, AttentionMessageEventArgs args) => |
| | 0 | 117 | | HandleAttentionMessage(args, tsc); |
| | | 118 | | |
| | | 119 | | void PeerDisconnectionEnvelope(object? _, PeerDisconnectedEventArgs args) => |
| | 0 | 120 | | HandlePeerDisconnection(args, channel.RemoteNodeId, tsc); |
| | | 121 | | |
| | | 122 | | void ExceptionRaisedEnvelope(object? _, Exception e) => |
| | 0 | 123 | | HandleExceptionRaised(e, tsc); |
| | | 124 | | |
| | | 125 | | void ChannelUpdatedHandlerEnvelope(object? _, ChannelUpdatedEventArgs args) => |
| | 0 | 126 | | HandleChannelUpdated(args, tsc); |
| | 0 | 127 | | } |
| | | 128 | | |
| | | 129 | | private void HandleAttentionMessage(AttentionMessageEventArgs args, |
| | | 130 | | TaskCompletionSource<OpenChannelClientSubscriptionResponse> tsc) |
| | | 131 | | { |
| | 0 | 132 | | if (args.ChannelId != _channelId) |
| | 0 | 133 | | return; |
| | | 134 | | |
| | 0 | 135 | | _logger.LogError( |
| | 0 | 136 | | "Received attention message from peer {peerId} for channel {channelId}: {message}", |
| | 0 | 137 | | args.PeerPubKey, args.ChannelId, args.Message); |
| | | 138 | | |
| | 0 | 139 | | tsc.TrySetException(new ChannelErrorException($"Error opening channel: {args.Message}")); |
| | 0 | 140 | | } |
| | | 141 | | |
| | | 142 | | private void HandlePeerDisconnection(PeerDisconnectedEventArgs args, CompactPubKey peerPubKey, |
| | | 143 | | TaskCompletionSource<OpenChannelClientSubscriptionResponse> tsc) |
| | | 144 | | { |
| | 0 | 145 | | if (args.PeerPubKey != peerPubKey) |
| | 0 | 146 | | return; |
| | | 147 | | |
| | 0 | 148 | | if (args.Exception is null) |
| | | 149 | | { |
| | 0 | 150 | | _logger.LogError("Peer disconnected without notice"); |
| | 0 | 151 | | tsc.TrySetException(new ConnectionException("Error opening channel: Peer disconnected")); |
| | | 152 | | } |
| | | 153 | | else |
| | | 154 | | { |
| | | 155 | | // Get to the bottom of the inner exceptions to fetch the real reason for the disconnection |
| | 0 | 156 | | var exception = args.Exception; |
| | 0 | 157 | | while (exception.InnerException is not null) |
| | 0 | 158 | | exception = exception.InnerException; |
| | | 159 | | |
| | 0 | 160 | | _logger.LogError(args.Exception, "Error opening channel. Error: {message}", exception.Message); |
| | 0 | 161 | | tsc.TrySetException(new ChannelErrorException($"Error opening channel: {exception.Message}", exception)); |
| | | 162 | | } |
| | 0 | 163 | | } |
| | | 164 | | |
| | | 165 | | private void HandleExceptionRaised(Exception e, TaskCompletionSource<OpenChannelClientSubscriptionResponse> tsc) |
| | | 166 | | { |
| | 0 | 167 | | if (e is not ChannelErrorException ce || ce.ChannelId != _channelId) |
| | 0 | 168 | | return; |
| | | 169 | | |
| | 0 | 170 | | _logger.LogError("Exception raised while opening channel: {message}", e.Message); |
| | 0 | 171 | | tsc.TrySetException(e); |
| | 0 | 172 | | } |
| | | 173 | | |
| | | 174 | | private void HandleChannelUpdated(ChannelUpdatedEventArgs args, |
| | | 175 | | TaskCompletionSource<OpenChannelClientSubscriptionResponse> tsc) |
| | | 176 | | { |
| | 0 | 177 | | if (args.Channel.ChannelId != _channelId) |
| | 0 | 178 | | return; |
| | | 179 | | |
| | 0 | 180 | | if (args.Channel.State == ChannelState.V1FundingSigned) |
| | | 181 | | { |
| | 0 | 182 | | tsc.TrySetResult(new OpenChannelClientSubscriptionResponse(args.Channel.ChannelId) |
| | 0 | 183 | | { |
| | 0 | 184 | | ChannelState = ChannelState.V1FundingSigned, |
| | 0 | 185 | | TxId = args.Channel.FundingOutput?.TransactionId, |
| | 0 | 186 | | Index = args.Channel.FundingOutput?.Index |
| | 0 | 187 | | }); |
| | | 188 | | } |
| | 0 | 189 | | else if (args.Channel.State is ChannelState.ReadyForUs or ChannelState.ReadyForThem) |
| | | 190 | | { |
| | 0 | 191 | | tsc.TrySetResult(new OpenChannelClientSubscriptionResponse(args.Channel.ChannelId) |
| | 0 | 192 | | { |
| | 0 | 193 | | ChannelState = ChannelState.ReadyForUs, |
| | 0 | 194 | | TxId = args.Channel.FundingOutput?.TransactionId, |
| | 0 | 195 | | Index = args.Channel.FundingOutput?.Index |
| | 0 | 196 | | }); |
| | | 197 | | } |
| | 0 | 198 | | } |
| | | 199 | | } |