| | | 1 | | using System.Collections.Concurrent; |
| | | 2 | | using Microsoft.Extensions.DependencyInjection; |
| | | 3 | | using Microsoft.Extensions.Logging; |
| | | 4 | | using Microsoft.Extensions.Options; |
| | | 5 | | using NBitcoin; |
| | | 6 | | using NetMQ; |
| | | 7 | | using NetMQ.Sockets; |
| | | 8 | | |
| | | 9 | | namespace NLightning.Infrastructure.Bitcoin.Wallet; |
| | | 10 | | |
| | | 11 | | using Domain.Bitcoin.Events; |
| | | 12 | | using Domain.Bitcoin.Interfaces; |
| | | 13 | | using Domain.Bitcoin.Transactions.Models; |
| | | 14 | | using Domain.Bitcoin.ValueObjects; |
| | | 15 | | using Domain.Bitcoin.Wallet.Models; |
| | | 16 | | using Domain.Channels.ValueObjects; |
| | | 17 | | using Domain.Crypto.ValueObjects; |
| | | 18 | | using Domain.Money; |
| | | 19 | | using Domain.Node.Options; |
| | | 20 | | using Domain.Persistence.Interfaces; |
| | | 21 | | using Interfaces; |
| | | 22 | | using Options; |
| | | 23 | | |
| | | 24 | | public class BlockchainMonitorService : IBlockchainMonitor |
| | | 25 | | { |
| | | 26 | | private readonly BitcoinOptions _bitcoinOptions; |
| | | 27 | | private readonly IBitcoinChainService _bitcoinChainService; |
| | 0 | 28 | | private readonly ILogger<BlockchainMonitorService> _logger; |
| | 0 | 29 | | private readonly IServiceProvider _serviceProvider; |
| | 0 | 30 | | private readonly Network _network; |
| | 40 | 31 | | private readonly SemaphoreSlim _newBlockSemaphore = new(1, 1); |
| | 40 | 32 | | private readonly SemaphoreSlim _blockBacklogSemaphore = new(1, 1); |
| | 40 | 33 | | private readonly ConcurrentDictionary<uint256, WatchedTransactionModel> _watchedTransactions = new(); |
| | 40 | 34 | | private readonly ConcurrentDictionary<string, WalletAddressModel> _watchedAddresses = new(); |
| | 40 | 35 | | private readonly OrderedDictionary<uint, Block> _blocksToProcess = new(); |
| | | 36 | | |
| | 40 | 37 | | private BlockchainState _blockchainState = new(0, Hash.Empty, DateTime.UtcNow); |
| | 0 | 38 | | private CancellationTokenSource? _cts; |
| | | 39 | | private Task? _monitoringTask; |
| | | 40 | | private uint _lastProcessedBlockHeight; |
| | | 41 | | private SubscriberSocket? _blockSocket; |
| | | 42 | | // private SubscriberSocket? _transactionSocket; |
| | | 43 | | |
| | | 44 | | public event EventHandler<NewBlockEventArgs>? OnNewBlockDetected; |
| | | 45 | | public event EventHandler<TransactionConfirmedEventArgs>? OnTransactionConfirmed; |
| | | 46 | | public event EventHandler<WalletMovementEventArgs>? OnWalletMovementDetected; |
| | | 47 | | |
| | 0 | 48 | | public uint LastProcessedBlockHeight => _lastProcessedBlockHeight; |
| | 0 | 49 | | |
| | 40 | 50 | | public BlockchainMonitorService(IOptions<BitcoinOptions> bitcoinOptions, IBitcoinChainService bitcoinChainService, |
| | 40 | 51 | | ILogger<BlockchainMonitorService> logger, IOptions<NodeOptions> nodeOptions, |
| | 40 | 52 | | IServiceProvider serviceProvider) |
| | 0 | 53 | | { |
| | 40 | 54 | | _bitcoinOptions = bitcoinOptions.Value; |
| | 40 | 55 | | _bitcoinChainService = bitcoinChainService; |
| | 40 | 56 | | _logger = logger; |
| | 40 | 57 | | _serviceProvider = serviceProvider; |
| | 40 | 58 | | _network = Network.GetNetwork(nodeOptions.Value.BitcoinNetwork) ?? Network.Main; |
| | 40 | 59 | | } |
| | | 60 | | |
| | 0 | 61 | | public async Task StartAsync(uint heightOfBirth, CancellationToken cancellationToken) |
| | | 62 | | { |
| | 32 | 63 | | _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); |
| | 0 | 64 | | |
| | 32 | 65 | | using var scope = _serviceProvider.CreateScope(); |
| | 32 | 66 | | using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>(); |
| | 0 | 67 | | |
| | | 68 | | // Load pending transactions |
| | 32 | 69 | | await LoadPendingWatchedTransactionsAsync(uow); |
| | 0 | 70 | | |
| | 0 | 71 | | // Load existing addresses |
| | 32 | 72 | | LoadBitcoinAddresses(uow); |
| | 0 | 73 | | |
| | | 74 | | // Load UtxoSet |
| | 32 | 75 | | await LoadUtxoSetAsync(uow); |
| | 0 | 76 | | |
| | 0 | 77 | | // Get the current state or create a new one if it doesn't exist |
| | 32 | 78 | | var currentBlockchainState = await uow.BlockchainStateDbRepository.GetStateAsync(); |
| | 32 | 79 | | if (currentBlockchainState is null) |
| | | 80 | | { |
| | 20 | 81 | | _logger.LogInformation("No blockchain state found, starting from height {Height}", heightOfBirth); |
| | 0 | 82 | | |
| | 20 | 83 | | _lastProcessedBlockHeight = heightOfBirth; |
| | 20 | 84 | | _blockchainState = new BlockchainState(_lastProcessedBlockHeight, Hash.Empty, DateTime.UtcNow); |
| | 20 | 85 | | uow.BlockchainStateDbRepository.Add(_blockchainState); |
| | | 86 | | } |
| | | 87 | | else |
| | 0 | 88 | | { |
| | 12 | 89 | | _blockchainState = currentBlockchainState; |
| | 12 | 90 | | _lastProcessedBlockHeight = _blockchainState.LastProcessedHeight; |
| | 12 | 91 | | _logger.LogInformation("Starting blockchain monitoring at height {Height}, last block hash {LastBlockHash}", |
| | 12 | 92 | | _lastProcessedBlockHeight, _blockchainState.LastProcessedBlockHash); |
| | 0 | 93 | | } |
| | | 94 | | |
| | | 95 | | // Get the current block height from the wallet |
| | 32 | 96 | | var currentBlockHeight = await _bitcoinChainService.GetCurrentBlockHeightAsync(); |
| | 0 | 97 | | |
| | 32 | 98 | | if (currentBlockHeight > _lastProcessedBlockHeight) |
| | 0 | 99 | | { |
| | | 100 | | // Add the current block to the processing queue |
| | 28 | 101 | | var currentBlock = await _bitcoinChainService.GetBlockAsync(_lastProcessedBlockHeight); |
| | 28 | 102 | | if (currentBlock is not null) |
| | 28 | 103 | | _blocksToProcess[_lastProcessedBlockHeight] = currentBlock; |
| | | 104 | | |
| | 0 | 105 | | // Add missing blocks to the processing queue and process any pending blocks |
| | 28 | 106 | | await AddMissingBlocksToProcessAsync(currentBlockHeight); |
| | 28 | 107 | | await ProcessPendingBlocksAsync(uow); |
| | 0 | 108 | | } |
| | | 109 | | |
| | 32 | 110 | | await uow.SaveChangesAsync(); |
| | | 111 | | |
| | 0 | 112 | | // Initialize ZMQ sockets |
| | 32 | 113 | | InitializeZmqSockets(); |
| | 0 | 114 | | |
| | | 115 | | // Start monitoring task |
| | 32 | 116 | | _monitoringTask = MonitorBlockchainAsync(_cts.Token); |
| | 0 | 117 | | |
| | 32 | 118 | | _logger.LogInformation("Blockchain monitor service started successfully"); |
| | 32 | 119 | | } |
| | | 120 | | |
| | | 121 | | public async Task StopAsync() |
| | | 122 | | { |
| | 8 | 123 | | if (_cts is null) |
| | 0 | 124 | | throw new InvalidOperationException("Service is not running"); |
| | 0 | 125 | | |
| | 8 | 126 | | await _cts.CancelAsync(); |
| | | 127 | | |
| | 8 | 128 | | if (_monitoringTask is not null) |
| | | 129 | | { |
| | | 130 | | try |
| | 0 | 131 | | { |
| | 8 | 132 | | await _monitoringTask; |
| | 8 | 133 | | } |
| | 0 | 134 | | catch (OperationCanceledException) |
| | | 135 | | { |
| | 0 | 136 | | // Expected during cancellation |
| | 0 | 137 | | } |
| | | 138 | | } |
| | 0 | 139 | | |
| | 8 | 140 | | CleanupZmqSockets(); |
| | 8 | 141 | | } |
| | 0 | 142 | | |
| | 0 | 143 | | public async Task PublishAndWatchTransactionAsync(ChannelId channelId, SignedTransaction signedTransaction, |
| | | 144 | | uint requiredDepth) |
| | 0 | 145 | | { |
| | 0 | 146 | | if (_logger.IsEnabled(LogLevel.Information)) |
| | 0 | 147 | | _logger.LogInformation( |
| | 0 | 148 | | "Publishing transaction {TxId} for {RequiredDepth} confirmations for channel {channelId}", |
| | 0 | 149 | | signedTransaction.TxId, requiredDepth, channelId); |
| | 0 | 150 | | |
| | | 151 | | // Convert the tx |
| | 0 | 152 | | var transaction = Transaction.Load(signedTransaction.RawTxBytes, _network); |
| | | 153 | | |
| | | 154 | | // Start watching the tx |
| | 0 | 155 | | await WatchTransactionAsync(channelId, signedTransaction.TxId, requiredDepth); |
| | | 156 | | |
| | | 157 | | // Publish the tx |
| | 0 | 158 | | await _bitcoinChainService.SendTransactionAsync(transaction); |
| | 0 | 159 | | } |
| | | 160 | | |
| | | 161 | | public async Task WatchTransactionAsync(ChannelId channelId, TxId txId, uint requiredDepth) |
| | | 162 | | { |
| | 4 | 163 | | if (_logger.IsEnabled(LogLevel.Information)) |
| | 0 | 164 | | _logger.LogInformation( |
| | 0 | 165 | | "Watching transaction {TxId} for {RequiredDepth} confirmations for channel {channelId}", |
| | 0 | 166 | | txId, requiredDepth, channelId); |
| | | 167 | | |
| | 4 | 168 | | using var scope = _serviceProvider.CreateScope(); |
| | 4 | 169 | | using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>(); |
| | | 170 | | |
| | 4 | 171 | | var nBitcoinTxId = new uint256(txId); |
| | 4 | 172 | | var watchedTx = new WatchedTransactionModel(channelId, txId, requiredDepth); |
| | | 173 | | |
| | 4 | 174 | | uow.WatchedTransactionDbRepository.Add(watchedTx); |
| | 0 | 175 | | |
| | 4 | 176 | | _watchedTransactions[nBitcoinTxId] = watchedTx; |
| | 0 | 177 | | |
| | 4 | 178 | | await uow.SaveChangesAsync(); |
| | 4 | 179 | | } |
| | | 180 | | |
| | | 181 | | public void WatchBitcoinAddress(WalletAddressModel walletAddress) |
| | 0 | 182 | | { |
| | 0 | 183 | | if (_logger.IsEnabled(LogLevel.Information)) |
| | 0 | 184 | | _logger.LogInformation("Watching bitcoin address {walletAddress} for deposits", walletAddress); |
| | 0 | 185 | | |
| | 0 | 186 | | _watchedAddresses[walletAddress.Address] = walletAddress; |
| | 0 | 187 | | } |
| | 0 | 188 | | |
| | | 189 | | // public Task WatchForRevocationAsync(TxId commitmentTxId, SignedTransaction penaltyTx) |
| | | 190 | | // { |
| | 0 | 191 | | // _logger.LogInformation("Watching for revocation of commitment transaction {CommitmentTxId}", commitmentTxId); |
| | 0 | 192 | | // |
| | | 193 | | // var nBitcoinTxId = new uint256(commitmentTxId); |
| | 0 | 194 | | // var revocationWatch = new RevocationWatch(nBitcoinTxId, Transaction.Load(penaltyTx.RawTxBytes, _network)); |
| | 0 | 195 | | // |
| | | 196 | | // _revocationWatches.TryAdd(nBitcoinTxId, revocationWatch); |
| | | 197 | | // return Task.CompletedTask; |
| | 0 | 198 | | // } |
| | | 199 | | |
| | | 200 | | private async Task MonitorBlockchainAsync(CancellationToken cancellationToken) |
| | 0 | 201 | | { |
| | 32 | 202 | | if (_logger.IsEnabled(LogLevel.Information)) |
| | 0 | 203 | | _logger.LogInformation("Starting blockchain monitoring loop"); |
| | | 204 | | |
| | 0 | 205 | | try |
| | | 206 | | { |
| | 104 | 207 | | while (!cancellationToken.IsCancellationRequested) |
| | 0 | 208 | | { |
| | | 209 | | try |
| | | 210 | | { |
| | | 211 | | // Check for new blocks |
| | 104 | 212 | | if (_blockSocket != null && |
| | 104 | 213 | | _blockSocket.TryReceiveFrameString(TimeSpan.FromMilliseconds(100), out var topic)) |
| | | 214 | | { |
| | 0 | 215 | | if (topic == "rawblock" && _blockSocket.TryReceiveFrameBytes(out var blockHashBytes)) |
| | | 216 | | { |
| | | 217 | | try |
| | | 218 | | { |
| | | 219 | | // One at a time |
| | 0 | 220 | | await _newBlockSemaphore.WaitAsync(cancellationToken); |
| | 0 | 221 | | var block = Block.Load(blockHashBytes, _network); |
| | 0 | 222 | | var coinbaseHeight = block.GetCoinbaseHeight(); |
| | 0 | 223 | | if (!coinbaseHeight.HasValue) |
| | | 224 | | { |
| | 0 | 225 | | // Get the current height from the wallet |
| | 0 | 226 | | var currentHeight = await _bitcoinChainService.GetCurrentBlockHeightAsync(); |
| | | 227 | | |
| | | 228 | | // Get the block from the wallet |
| | 0 | 229 | | var blockAtHeight = await _bitcoinChainService.GetBlockAsync(currentHeight); |
| | 0 | 230 | | if (blockAtHeight is null) |
| | | 231 | | { |
| | 0 | 232 | | _logger.LogError("Failed to retrieve block at height {Height}", currentHeight); |
| | 0 | 233 | | return; |
| | 0 | 234 | | } |
| | | 235 | | |
| | 0 | 236 | | coinbaseHeight = (int)currentHeight; |
| | 0 | 237 | | } |
| | 0 | 238 | | |
| | 0 | 239 | | await ProcessNewBlock(block, (uint)coinbaseHeight); |
| | 0 | 240 | | } |
| | | 241 | | finally |
| | | 242 | | { |
| | 0 | 243 | | _newBlockSemaphore.Release(); |
| | | 244 | | } |
| | 0 | 245 | | } |
| | 0 | 246 | | } |
| | 0 | 247 | | |
| | | 248 | | // TODO: Check for new transactions |
| | | 249 | | // if (_transactionSocket != null && |
| | | 250 | | // _transactionSocket.TryReceiveFrameString(TimeSpan.FromMilliseconds(100), out var txTopic)) |
| | | 251 | | // { |
| | | 252 | | // if (txTopic == "rawtx" && _transactionSocket.TryReceiveFrameBytes(out var rawTxBytes)) |
| | | 253 | | // { |
| | 0 | 254 | | // await ProcessNewTransaction(rawTxBytes); |
| | 0 | 255 | | // } |
| | 0 | 256 | | // } |
| | 0 | 257 | | |
| | | 258 | | // Small delay to prevent CPU spinning |
| | 96 | 259 | | await Task.Delay(50, cancellationToken); |
| | 72 | 260 | | } |
| | 8 | 261 | | catch (Exception ex) when (!cancellationToken.IsCancellationRequested) |
| | | 262 | | { |
| | 0 | 263 | | _logger.LogError(ex, "Error in blockchain monitoring loop"); |
| | 0 | 264 | | await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); |
| | | 265 | | } |
| | | 266 | | } |
| | 0 | 267 | | } |
| | 8 | 268 | | catch (OperationCanceledException) |
| | 0 | 269 | | { |
| | 8 | 270 | | if (_logger.IsEnabled(LogLevel.Information)) |
| | 0 | 271 | | _logger.LogInformation("Blockchain monitoring loop cancelled"); |
| | 8 | 272 | | } |
| | 0 | 273 | | catch (Exception ex) |
| | | 274 | | { |
| | 0 | 275 | | _logger.LogError(ex, "Fatal error in blockchain monitoring loop"); |
| | 0 | 276 | | } |
| | 8 | 277 | | } |
| | | 278 | | |
| | 0 | 279 | | private void InitializeZmqSockets() |
| | 0 | 280 | | { |
| | 0 | 281 | | try |
| | | 282 | | { |
| | | 283 | | // Subscribe to new blocks |
| | 32 | 284 | | _blockSocket = new SubscriberSocket(); |
| | 32 | 285 | | _blockSocket.Connect($"tcp://{_bitcoinOptions.ZmqHost}:{_bitcoinOptions.ZmqBlockPort}"); |
| | 32 | 286 | | _blockSocket.Subscribe("rawblock"); |
| | 0 | 287 | | |
| | | 288 | | // // Subscribe to new transactions (for mempool monitoring) |
| | 0 | 289 | | // _transactionSocket = new SubscriberSocket(); |
| | | 290 | | // _transactionSocket.Connect($"tcp://{_bitcoinOptions.ZmqHost}:{_bitcoinOptions.ZmqTxPort}"); |
| | 0 | 291 | | // _transactionSocket.Subscribe("rawtx"); |
| | 0 | 292 | | |
| | 32 | 293 | | if (_logger.IsEnabled(LogLevel.Information)) |
| | 0 | 294 | | _logger.LogInformation("ZMQ sockets initialized - Block: {BlockPort}, Tx: {TxPort}", |
| | 0 | 295 | | _bitcoinOptions.ZmqBlockPort, _bitcoinOptions.ZmqTxPort); |
| | 32 | 296 | | } |
| | 0 | 297 | | catch (Exception ex) |
| | | 298 | | { |
| | 0 | 299 | | _logger.LogError(ex, "Failed to initialize ZMQ sockets"); |
| | 0 | 300 | | CleanupZmqSockets(); |
| | 0 | 301 | | throw; |
| | 0 | 302 | | } |
| | 32 | 303 | | } |
| | | 304 | | |
| | | 305 | | private void CleanupZmqSockets() |
| | 0 | 306 | | { |
| | 0 | 307 | | try |
| | | 308 | | { |
| | 8 | 309 | | _blockSocket?.Dispose(); |
| | 8 | 310 | | _blockSocket = null; |
| | | 311 | | |
| | 0 | 312 | | // _transactionSocket?.Dispose(); |
| | | 313 | | // _transactionSocket = null; |
| | 0 | 314 | | |
| | 8 | 315 | | _logger.LogDebug("ZMQ sockets cleaned up"); |
| | 8 | 316 | | } |
| | 0 | 317 | | catch (Exception ex) |
| | 0 | 318 | | { |
| | 0 | 319 | | _logger.LogError(ex, "Error cleaning up ZMQ sockets"); |
| | 0 | 320 | | } |
| | 8 | 321 | | } |
| | | 322 | | |
| | | 323 | | private async Task ProcessPendingBlocksAsync(IUnitOfWork uow) |
| | | 324 | | { |
| | 0 | 325 | | try |
| | | 326 | | { |
| | 32 | 327 | | await _blockBacklogSemaphore.WaitAsync(); |
| | | 328 | | |
| | 796 | 329 | | while (_blocksToProcess.Count > 0) |
| | | 330 | | { |
| | 764 | 331 | | var blockKvp = _blocksToProcess.First(); |
| | 764 | 332 | | if (blockKvp.Key <= _lastProcessedBlockHeight) |
| | 28 | 333 | | _logger.LogWarning("Possible reorg detected: Block {Height} is already processed.", blockKvp.Key); |
| | 0 | 334 | | |
| | 764 | 335 | | ProcessBlock(blockKvp.Value, blockKvp.Key, uow); |
| | 0 | 336 | | } |
| | 32 | 337 | | } |
| | | 338 | | finally |
| | | 339 | | { |
| | 32 | 340 | | _blockBacklogSemaphore.Release(); |
| | | 341 | | } |
| | 32 | 342 | | } |
| | 0 | 343 | | |
| | | 344 | | private async Task AddMissingBlocksToProcessAsync(uint currentHeight) |
| | | 345 | | { |
| | 32 | 346 | | var lastProcessedHeight = _lastProcessedBlockHeight + 1; |
| | 32 | 347 | | if (currentHeight > lastProcessedHeight) |
| | | 348 | | { |
| | 28 | 349 | | _logger.LogWarning("Processing missed blocks from height {LastProcessedHeight} to {CurrentHeight}", |
| | 28 | 350 | | lastProcessedHeight, currentHeight); |
| | 0 | 351 | | |
| | 1520 | 352 | | for (var height = lastProcessedHeight; height < currentHeight; height++) |
| | 0 | 353 | | { |
| | 732 | 354 | | if (_blocksToProcess.ContainsKey(height)) |
| | | 355 | | continue; |
| | 0 | 356 | | |
| | 0 | 357 | | // Add the missing block to the process queue |
| | 732 | 358 | | var blockAtHeight = await _bitcoinChainService.GetBlockAsync(height); |
| | 732 | 359 | | if (blockAtHeight is not null) |
| | | 360 | | { |
| | 732 | 361 | | _blocksToProcess[height] = blockAtHeight; |
| | | 362 | | } |
| | | 363 | | else |
| | | 364 | | { |
| | 0 | 365 | | _logger.LogError("Missing block at height {Height}", height); |
| | | 366 | | } |
| | | 367 | | } |
| | | 368 | | } |
| | 32 | 369 | | } |
| | | 370 | | |
| | | 371 | | private async Task ProcessNewBlock(Block block, uint currentHeight) |
| | | 372 | | { |
| | 4 | 373 | | using var scope = _serviceProvider.CreateScope(); |
| | 4 | 374 | | using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>(); |
| | | 375 | | |
| | 4 | 376 | | var blockHash = block.GetHash(); |
| | | 377 | | |
| | 0 | 378 | | try |
| | | 379 | | { |
| | 4 | 380 | | if (_logger.IsEnabled(LogLevel.Debug)) |
| | 0 | 381 | | _logger.LogDebug("Processing block at height {blockHeight}: {BlockHash}", currentHeight, blockHash); |
| | | 382 | | |
| | | 383 | | // Check for missed blocks first |
| | 4 | 384 | | await AddMissingBlocksToProcessAsync(currentHeight); |
| | | 385 | | |
| | | 386 | | // Store the current block for processing |
| | 4 | 387 | | _blocksToProcess[currentHeight] = block; |
| | 0 | 388 | | |
| | | 389 | | // Process missing blocks |
| | 4 | 390 | | await ProcessPendingBlocksAsync(uow); |
| | 4 | 391 | | } |
| | 0 | 392 | | catch (Exception ex) |
| | 0 | 393 | | { |
| | 0 | 394 | | _logger.LogError(ex, "Error processing new block {BlockHash}", blockHash); |
| | 0 | 395 | | } |
| | 0 | 396 | | |
| | 4 | 397 | | await uow.SaveChangesAsync(); |
| | 4 | 398 | | } |
| | | 399 | | |
| | 0 | 400 | | // TODO: Check for revocation transactions in mempool |
| | 0 | 401 | | // private async Task ProcessNewTransaction(byte[] rawTxBytes) |
| | 0 | 402 | | // { |
| | | 403 | | // try |
| | | 404 | | // { |
| | | 405 | | // var transaction = Transaction.Load(rawTxBytes, Network.Main); |
| | 0 | 406 | | // } |
| | 0 | 407 | | // catch (Exception ex) |
| | 0 | 408 | | // { |
| | | 409 | | // _logger.LogError(ex, "Error processing new transaction from mempool"); |
| | 0 | 410 | | // } |
| | 0 | 411 | | // } |
| | 0 | 412 | | |
| | 0 | 413 | | private void ProcessBlock(Block block, uint height, IUnitOfWork uow) |
| | | 414 | | { |
| | 0 | 415 | | try |
| | 0 | 416 | | { |
| | 764 | 417 | | var blockHash = block.GetHash(); |
| | | 418 | | |
| | 764 | 419 | | if (_logger.IsEnabled(LogLevel.Debug)) |
| | 0 | 420 | | _logger.LogDebug("Processing block {Height} with {TxCount} transactions", height, |
| | 0 | 421 | | block.Transactions.Count); |
| | 0 | 422 | | |
| | 0 | 423 | | // Notify listeners of the new block |
| | 764 | 424 | | OnNewBlockDetected?.Invoke(this, new NewBlockEventArgs(height, blockHash.ToBytes())); |
| | 0 | 425 | | |
| | 0 | 426 | | // Check if watched transactions are included in this block |
| | 764 | 427 | | CheckBlockForWatchedTransactions(block.Transactions, height, uow); |
| | 0 | 428 | | |
| | | 429 | | // Check for deposits in this block |
| | 764 | 430 | | CheckBlockForWalletMovement(block.Transactions, height, uow); |
| | | 431 | | |
| | | 432 | | // Update blockchain state |
| | 764 | 433 | | _blockchainState.UpdateState(blockHash.ToBytes(), height); |
| | 764 | 434 | | uow.BlockchainStateDbRepository.Update(_blockchainState); |
| | | 435 | | |
| | 764 | 436 | | _blocksToProcess.Remove(height); |
| | | 437 | | |
| | 0 | 438 | | // Update our internal state |
| | 764 | 439 | | _lastProcessedBlockHeight = height; |
| | | 440 | | |
| | 0 | 441 | | // Check watched for all transactions' depth |
| | 764 | 442 | | CheckWatchedTransactionsDepth(uow); |
| | 764 | 443 | | } |
| | 0 | 444 | | catch (Exception ex) |
| | | 445 | | { |
| | 0 | 446 | | _logger.LogError(ex, "Error processing block at height {Height}", height); |
| | 0 | 447 | | } |
| | 764 | 448 | | } |
| | | 449 | | |
| | 0 | 450 | | private void ConfirmTransaction(uint blockHeight, IUnitOfWork uow, WatchedTransactionModel watchedTransaction) |
| | 0 | 451 | | { |
| | 4 | 452 | | if (_logger.IsEnabled(LogLevel.Information)) |
| | 0 | 453 | | _logger.LogInformation( |
| | 0 | 454 | | "Transaction {TxId} reached required depth of {depth} confirmations at block {blockHeight}", |
| | 0 | 455 | | watchedTransaction.TransactionId, watchedTransaction.RequiredDepth, blockHeight); |
| | | 456 | | |
| | 4 | 457 | | watchedTransaction.MarkAsCompleted(); |
| | 4 | 458 | | uow.WatchedTransactionDbRepository.Update(watchedTransaction); |
| | 4 | 459 | | OnTransactionConfirmed?.Invoke( |
| | 4 | 460 | | this, new TransactionConfirmedEventArgs(watchedTransaction, blockHeight)); |
| | 0 | 461 | | |
| | 4 | 462 | | _watchedTransactions.TryRemove(new uint256(watchedTransaction.TransactionId), out _); |
| | 4 | 463 | | } |
| | 0 | 464 | | |
| | 0 | 465 | | private void CheckBlockForWatchedTransactions(List<Transaction> blockTransactions, uint blockHeight, |
| | | 466 | | IUnitOfWork uow) |
| | 0 | 467 | | { |
| | 764 | 468 | | if (_logger.IsEnabled(LogLevel.Debug)) |
| | 0 | 469 | | _logger.LogDebug( |
| | 0 | 470 | | "Checking {watchedTransactionCount} watched transactions for block {height} with {TxCount} transactions" |
| | 0 | 471 | | _watchedTransactions.Count, blockHeight, blockTransactions.Count); |
| | | 472 | | |
| | 764 | 473 | | ushort index = 0; |
| | 1528 | 474 | | foreach (var transaction in blockTransactions) |
| | | 475 | | { |
| | 0 | 476 | | var txId = transaction.GetHash(); |
| | 0 | 477 | | |
| | 0 | 478 | | if (!_watchedTransactions.TryGetValue(txId, out var watchedTransaction)) |
| | 0 | 479 | | continue; |
| | | 480 | | |
| | 0 | 481 | | _logger.LogInformation("Transaction {TxId} found in block at height {Height}", txId, blockHeight); |
| | | 482 | | |
| | | 483 | | try |
| | | 484 | | { |
| | | 485 | | // Update first seen height |
| | 0 | 486 | | watchedTransaction.SetHeightAndIndex(blockHeight, index); |
| | 0 | 487 | | uow.WatchedTransactionDbRepository.Update(watchedTransaction); |
| | | 488 | | |
| | 0 | 489 | | if (watchedTransaction.RequiredDepth == 0) |
| | 0 | 490 | | ConfirmTransaction(blockHeight, uow, watchedTransaction); |
| | 0 | 491 | | } |
| | 0 | 492 | | catch (Exception ex) |
| | | 493 | | { |
| | 0 | 494 | | _logger.LogError(ex, "Error checking confirmations for transaction {TxId}", txId); |
| | 0 | 495 | | } |
| | | 496 | | finally |
| | | 497 | | { |
| | 0 | 498 | | index++; |
| | 0 | 499 | | } |
| | | 500 | | } |
| | 764 | 501 | | } |
| | | 502 | | |
| | | 503 | | private void CheckBlockForWalletMovement(List<Transaction> transactions, uint blockHeight, IUnitOfWork uow) |
| | | 504 | | { |
| | 764 | 505 | | if (_watchedAddresses.IsEmpty) |
| | 764 | 506 | | return; |
| | | 507 | | |
| | 0 | 508 | | if (_logger.IsEnabled(LogLevel.Debug)) |
| | 0 | 509 | | _logger.LogDebug("Checking {AddressCount} watched addresses for deposits/spends in block {Height}", |
| | 0 | 510 | | _watchedAddresses.Count, blockHeight); |
| | | 511 | | |
| | 0 | 512 | | foreach (var transaction in transactions) |
| | | 513 | | { |
| | 0 | 514 | | var txId = transaction.GetHash(); |
| | | 515 | | |
| | | 516 | | // Check each output for deposits |
| | 0 | 517 | | for (var i = 0; i < transaction.Outputs.Count; i++) |
| | | 518 | | { |
| | 0 | 519 | | var output = transaction.Outputs[i]; |
| | 0 | 520 | | var destinationAddress = output.ScriptPubKey.GetDestinationAddress(_network); |
| | 0 | 521 | | if (destinationAddress == null) |
| | | 522 | | continue; |
| | | 523 | | |
| | 0 | 524 | | if (!_watchedAddresses.TryGetValue(destinationAddress.ToString(), out var watchedAddress)) |
| | | 525 | | continue; |
| | | 526 | | |
| | 0 | 527 | | if (_logger.IsEnabled(LogLevel.Information)) |
| | 0 | 528 | | _logger.LogInformation( |
| | 0 | 529 | | "Deposit detected: {amount} to address {destinationAddress} in tx {txId} at block {height}", |
| | 0 | 530 | | output.Value, destinationAddress, txId, blockHeight); |
| | | 531 | | |
| | | 532 | | // Save Utxo to the database |
| | 0 | 533 | | var utxo = new UtxoModel(txId.ToBytes(), (uint)i, LightningMoney.Satoshis(output.Value.Satoshi), |
| | 0 | 534 | | blockHeight, watchedAddress); |
| | 0 | 535 | | uow.AddUtxo(utxo); |
| | | 536 | | |
| | 0 | 537 | | if (!_watchedAddresses.TryRemove(destinationAddress.ToString(), out _)) |
| | 0 | 538 | | _logger.LogError("Unable to remove watched address {DestinationAddress} from the list", |
| | 0 | 539 | | destinationAddress); |
| | | 540 | | |
| | 0 | 541 | | OnWalletMovementDetected |
| | 0 | 542 | | ?.Invoke(this, new WalletMovementEventArgs(destinationAddress.ToString(), |
| | 0 | 543 | | LightningMoney.Satoshis(output.Value.Satoshi), |
| | 0 | 544 | | txId.ToBytes(), |
| | 0 | 545 | | blockHeight)); |
| | | 546 | | } |
| | | 547 | | |
| | | 548 | | // Check each input for spent utxos |
| | 0 | 549 | | foreach (var input in transaction.Inputs) |
| | 0 | 550 | | uow.TrySpendUtxo(new TxId(input.PrevOut.Hash.ToBytes()), input.PrevOut.N); |
| | | 551 | | } |
| | 0 | 552 | | } |
| | | 553 | | |
| | | 554 | | private void CheckWatchedTransactionsDepth(IUnitOfWork uow) |
| | | 555 | | { |
| | 1624 | 556 | | foreach (var (txId, watchedTransaction) in _watchedTransactions) |
| | | 557 | | { |
| | | 558 | | try |
| | | 559 | | { |
| | | 560 | | // The FirstSeenAtHeight represents 1 confirmation, so we have to add 1 |
| | 44 | 561 | | var confirmations = _lastProcessedBlockHeight - watchedTransaction.FirstSeenAtHeight + 1; |
| | 44 | 562 | | if (confirmations >= watchedTransaction.RequiredDepth) |
| | 4 | 563 | | ConfirmTransaction(_lastProcessedBlockHeight, uow, watchedTransaction); |
| | 44 | 564 | | } |
| | 0 | 565 | | catch (Exception ex) |
| | | 566 | | { |
| | 0 | 567 | | _logger.LogError(ex, "Error checking confirmations for transaction {TxId}", txId); |
| | 0 | 568 | | } |
| | | 569 | | } |
| | 768 | 570 | | } |
| | | 571 | | |
| | | 572 | | private async Task LoadPendingWatchedTransactionsAsync(IUnitOfWork uow) |
| | | 573 | | { |
| | 32 | 574 | | _logger.LogInformation("Loading watched transactions from database"); |
| | | 575 | | |
| | 32 | 576 | | var watchedTransactions = await uow.WatchedTransactionDbRepository.GetAllPendingAsync(); |
| | 72 | 577 | | foreach (var watchedTransaction in watchedTransactions) |
| | | 578 | | { |
| | 4 | 579 | | _watchedTransactions[new uint256(watchedTransaction.TransactionId)] = watchedTransaction; |
| | | 580 | | } |
| | 32 | 581 | | } |
| | | 582 | | |
| | | 583 | | private void LoadBitcoinAddresses(IUnitOfWork uow) |
| | | 584 | | { |
| | 32 | 585 | | _logger.LogInformation("Loading bitcoin addresses from database"); |
| | | 586 | | |
| | 32 | 587 | | var bitcoinAddresses = uow.WalletAddressesDbRepository.GetAllAddresses(); |
| | 64 | 588 | | foreach (var bitcoinAddress in bitcoinAddresses) |
| | | 589 | | { |
| | 0 | 590 | | _watchedAddresses[bitcoinAddress.Address] = bitcoinAddress; |
| | | 591 | | } |
| | 32 | 592 | | } |
| | | 593 | | |
| | | 594 | | private async Task LoadUtxoSetAsync(IUnitOfWork uow) |
| | | 595 | | { |
| | 32 | 596 | | _logger.LogInformation("Loading Utxo set"); |
| | | 597 | | |
| | 32 | 598 | | var utxoSet = (await uow.UtxoDbRepository.GetUnspentAsync()).ToList(); |
| | 32 | 599 | | if (utxoSet.Count > 0) |
| | | 600 | | { |
| | 0 | 601 | | var utxoMemoryRepository = _serviceProvider.GetService<IUtxoMemoryRepository>() ?? |
| | 0 | 602 | | throw new InvalidOperationException( |
| | 0 | 603 | | $"Error getting required service {nameof(IUtxoMemoryRepository)}"); |
| | 0 | 604 | | utxoMemoryRepository.Load(utxoSet); |
| | | 605 | | } |
| | 32 | 606 | | } |
| | | 607 | | } |