< Summary - Combined Code Coverage

Information
Class: NLightning.Infrastructure.Bitcoin.Wallet.BlockchainMonitorService
Assembly: NLightning.Infrastructure.Bitcoin
File(s): /home/runner/work/NLightning/NLightning/src/NLightning.Infrastructure.Bitcoin/Wallet/BlockchainMonitorService.cs
Tag: 57_24045730253
Line coverage
40%
Covered lines: 154
Uncovered lines: 224
Coverable lines: 378
Total lines: 607
Line coverage: 40.7%
Branch coverage
32%
Covered branches: 52
Total branches: 162
Branch coverage: 32%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/NLightning/NLightning/src/NLightning.Infrastructure.Bitcoin/Wallet/BlockchainMonitorService.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using Microsoft.Extensions.DependencyInjection;
 3using Microsoft.Extensions.Logging;
 4using Microsoft.Extensions.Options;
 5using NBitcoin;
 6using NetMQ;
 7using NetMQ.Sockets;
 8
 9namespace NLightning.Infrastructure.Bitcoin.Wallet;
 10
 11using Domain.Bitcoin.Events;
 12using Domain.Bitcoin.Interfaces;
 13using Domain.Bitcoin.Transactions.Models;
 14using Domain.Bitcoin.ValueObjects;
 15using Domain.Bitcoin.Wallet.Models;
 16using Domain.Channels.ValueObjects;
 17using Domain.Crypto.ValueObjects;
 18using Domain.Money;
 19using Domain.Node.Options;
 20using Domain.Persistence.Interfaces;
 21using Interfaces;
 22using Options;
 23
 24public class BlockchainMonitorService : IBlockchainMonitor
 25{
 26    private readonly BitcoinOptions _bitcoinOptions;
 27    private readonly IBitcoinChainService _bitcoinChainService;
 028    private readonly ILogger<BlockchainMonitorService> _logger;
 029    private readonly IServiceProvider _serviceProvider;
 030    private readonly Network _network;
 4031    private readonly SemaphoreSlim _newBlockSemaphore = new(1, 1);
 4032    private readonly SemaphoreSlim _blockBacklogSemaphore = new(1, 1);
 4033    private readonly ConcurrentDictionary<uint256, WatchedTransactionModel> _watchedTransactions = new();
 4034    private readonly ConcurrentDictionary<string, WalletAddressModel> _watchedAddresses = new();
 4035    private readonly OrderedDictionary<uint, Block> _blocksToProcess = new();
 36
 4037    private BlockchainState _blockchainState = new(0, Hash.Empty, DateTime.UtcNow);
 038    private CancellationTokenSource? _cts;
 39    private Task? _monitoringTask;
 40    private uint _lastProcessedBlockHeight;
 41    private SubscriberSocket? _blockSocket;
 42    // private SubscriberSocket? _transactionSocket;
 43
 44    public event EventHandler<NewBlockEventArgs>? OnNewBlockDetected;
 45    public event EventHandler<TransactionConfirmedEventArgs>? OnTransactionConfirmed;
 46    public event EventHandler<WalletMovementEventArgs>? OnWalletMovementDetected;
 47
 048    public uint LastProcessedBlockHeight => _lastProcessedBlockHeight;
 049
 4050    public BlockchainMonitorService(IOptions<BitcoinOptions> bitcoinOptions, IBitcoinChainService bitcoinChainService,
 4051                                    ILogger<BlockchainMonitorService> logger, IOptions<NodeOptions> nodeOptions,
 4052                                    IServiceProvider serviceProvider)
 053    {
 4054        _bitcoinOptions = bitcoinOptions.Value;
 4055        _bitcoinChainService = bitcoinChainService;
 4056        _logger = logger;
 4057        _serviceProvider = serviceProvider;
 4058        _network = Network.GetNetwork(nodeOptions.Value.BitcoinNetwork) ?? Network.Main;
 4059    }
 60
 061    public async Task StartAsync(uint heightOfBirth, CancellationToken cancellationToken)
 62    {
 3263        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 064
 3265        using var scope = _serviceProvider.CreateScope();
 3266        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 067
 68        // Load pending transactions
 3269        await LoadPendingWatchedTransactionsAsync(uow);
 070
 071        // Load existing addresses
 3272        LoadBitcoinAddresses(uow);
 073
 74        // Load UtxoSet
 3275        await LoadUtxoSetAsync(uow);
 076
 077        // Get the current state or create a new one if it doesn't exist
 3278        var currentBlockchainState = await uow.BlockchainStateDbRepository.GetStateAsync();
 3279        if (currentBlockchainState is null)
 80        {
 2081            _logger.LogInformation("No blockchain state found, starting from height {Height}", heightOfBirth);
 082
 2083            _lastProcessedBlockHeight = heightOfBirth;
 2084            _blockchainState = new BlockchainState(_lastProcessedBlockHeight, Hash.Empty, DateTime.UtcNow);
 2085            uow.BlockchainStateDbRepository.Add(_blockchainState);
 86        }
 87        else
 088        {
 1289            _blockchainState = currentBlockchainState;
 1290            _lastProcessedBlockHeight = _blockchainState.LastProcessedHeight;
 1291            _logger.LogInformation("Starting blockchain monitoring at height {Height}, last block hash {LastBlockHash}",
 1292                                   _lastProcessedBlockHeight, _blockchainState.LastProcessedBlockHash);
 093        }
 94
 95        // Get the current block height from the wallet
 3296        var currentBlockHeight = await _bitcoinChainService.GetCurrentBlockHeightAsync();
 097
 3298        if (currentBlockHeight > _lastProcessedBlockHeight)
 099        {
 100            // Add the current block to the processing queue
 28101            var currentBlock = await _bitcoinChainService.GetBlockAsync(_lastProcessedBlockHeight);
 28102            if (currentBlock is not null)
 28103                _blocksToProcess[_lastProcessedBlockHeight] = currentBlock;
 104
 0105            // Add missing blocks to the processing queue and process any pending blocks
 28106            await AddMissingBlocksToProcessAsync(currentBlockHeight);
 28107            await ProcessPendingBlocksAsync(uow);
 0108        }
 109
 32110        await uow.SaveChangesAsync();
 111
 0112        // Initialize ZMQ sockets
 32113        InitializeZmqSockets();
 0114
 115        // Start monitoring task
 32116        _monitoringTask = MonitorBlockchainAsync(_cts.Token);
 0117
 32118        _logger.LogInformation("Blockchain monitor service started successfully");
 32119    }
 120
 121    public async Task StopAsync()
 122    {
 8123        if (_cts is null)
 0124            throw new InvalidOperationException("Service is not running");
 0125
 8126        await _cts.CancelAsync();
 127
 8128        if (_monitoringTask is not null)
 129        {
 130            try
 0131            {
 8132                await _monitoringTask;
 8133            }
 0134            catch (OperationCanceledException)
 135            {
 0136                // Expected during cancellation
 0137            }
 138        }
 0139
 8140        CleanupZmqSockets();
 8141    }
 0142
 0143    public async Task PublishAndWatchTransactionAsync(ChannelId channelId, SignedTransaction signedTransaction,
 144                                                      uint requiredDepth)
 0145    {
 0146        if (_logger.IsEnabled(LogLevel.Information))
 0147            _logger.LogInformation(
 0148                "Publishing transaction {TxId} for {RequiredDepth} confirmations for channel {channelId}",
 0149                signedTransaction.TxId, requiredDepth, channelId);
 0150
 151        // Convert the tx
 0152        var transaction = Transaction.Load(signedTransaction.RawTxBytes, _network);
 153
 154        // Start watching the tx
 0155        await WatchTransactionAsync(channelId, signedTransaction.TxId, requiredDepth);
 156
 157        // Publish the tx
 0158        await _bitcoinChainService.SendTransactionAsync(transaction);
 0159    }
 160
 161    public async Task WatchTransactionAsync(ChannelId channelId, TxId txId, uint requiredDepth)
 162    {
 4163        if (_logger.IsEnabled(LogLevel.Information))
 0164            _logger.LogInformation(
 0165                "Watching transaction {TxId} for {RequiredDepth} confirmations for channel {channelId}",
 0166                txId, requiredDepth, channelId);
 167
 4168        using var scope = _serviceProvider.CreateScope();
 4169        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 170
 4171        var nBitcoinTxId = new uint256(txId);
 4172        var watchedTx = new WatchedTransactionModel(channelId, txId, requiredDepth);
 173
 4174        uow.WatchedTransactionDbRepository.Add(watchedTx);
 0175
 4176        _watchedTransactions[nBitcoinTxId] = watchedTx;
 0177
 4178        await uow.SaveChangesAsync();
 4179    }
 180
 181    public void WatchBitcoinAddress(WalletAddressModel walletAddress)
 0182    {
 0183        if (_logger.IsEnabled(LogLevel.Information))
 0184            _logger.LogInformation("Watching bitcoin address {walletAddress} for deposits", walletAddress);
 0185
 0186        _watchedAddresses[walletAddress.Address] = walletAddress;
 0187    }
 0188
 189    // public Task WatchForRevocationAsync(TxId commitmentTxId, SignedTransaction penaltyTx)
 190    // {
 0191    //     _logger.LogInformation("Watching for revocation of commitment transaction {CommitmentTxId}", commitmentTxId);
 0192    //
 193    //     var nBitcoinTxId = new uint256(commitmentTxId);
 0194    //     var revocationWatch = new RevocationWatch(nBitcoinTxId, Transaction.Load(penaltyTx.RawTxBytes, _network));
 0195    //
 196    //     _revocationWatches.TryAdd(nBitcoinTxId, revocationWatch);
 197    //     return Task.CompletedTask;
 0198    // }
 199
 200    private async Task MonitorBlockchainAsync(CancellationToken cancellationToken)
 0201    {
 32202        if (_logger.IsEnabled(LogLevel.Information))
 0203            _logger.LogInformation("Starting blockchain monitoring loop");
 204
 0205        try
 206        {
 104207            while (!cancellationToken.IsCancellationRequested)
 0208            {
 209                try
 210                {
 211                    // Check for new blocks
 104212                    if (_blockSocket != null &&
 104213                        _blockSocket.TryReceiveFrameString(TimeSpan.FromMilliseconds(100), out var topic))
 214                    {
 0215                        if (topic == "rawblock" && _blockSocket.TryReceiveFrameBytes(out var blockHashBytes))
 216                        {
 217                            try
 218                            {
 219                                // One at a time
 0220                                await _newBlockSemaphore.WaitAsync(cancellationToken);
 0221                                var block = Block.Load(blockHashBytes, _network);
 0222                                var coinbaseHeight = block.GetCoinbaseHeight();
 0223                                if (!coinbaseHeight.HasValue)
 224                                {
 0225                                    // Get the current height from the wallet
 0226                                    var currentHeight = await _bitcoinChainService.GetCurrentBlockHeightAsync();
 227
 228                                    // Get the block from the wallet
 0229                                    var blockAtHeight = await _bitcoinChainService.GetBlockAsync(currentHeight);
 0230                                    if (blockAtHeight is null)
 231                                    {
 0232                                        _logger.LogError("Failed to retrieve block at height {Height}", currentHeight);
 0233                                        return;
 0234                                    }
 235
 0236                                    coinbaseHeight = (int)currentHeight;
 0237                                }
 0238
 0239                                await ProcessNewBlock(block, (uint)coinbaseHeight);
 0240                            }
 241                            finally
 242                            {
 0243                                _newBlockSemaphore.Release();
 244                            }
 0245                        }
 0246                    }
 0247
 248                    // TODO: Check for new transactions
 249                    // if (_transactionSocket != null &&
 250                    //     _transactionSocket.TryReceiveFrameString(TimeSpan.FromMilliseconds(100), out var txTopic))
 251                    // {
 252                    //     if (txTopic == "rawtx" && _transactionSocket.TryReceiveFrameBytes(out var rawTxBytes))
 253                    //     {
 0254                    //         await ProcessNewTransaction(rawTxBytes);
 0255                    //     }
 0256                    // }
 0257
 258                    // Small delay to prevent CPU spinning
 96259                    await Task.Delay(50, cancellationToken);
 72260                }
 8261                catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
 262                {
 0263                    _logger.LogError(ex, "Error in blockchain monitoring loop");
 0264                    await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
 265                }
 266            }
 0267        }
 8268        catch (OperationCanceledException)
 0269        {
 8270            if (_logger.IsEnabled(LogLevel.Information))
 0271                _logger.LogInformation("Blockchain monitoring loop cancelled");
 8272        }
 0273        catch (Exception ex)
 274        {
 0275            _logger.LogError(ex, "Fatal error in blockchain monitoring loop");
 0276        }
 8277    }
 278
 0279    private void InitializeZmqSockets()
 0280    {
 0281        try
 282        {
 283            // Subscribe to new blocks
 32284            _blockSocket = new SubscriberSocket();
 32285            _blockSocket.Connect($"tcp://{_bitcoinOptions.ZmqHost}:{_bitcoinOptions.ZmqBlockPort}");
 32286            _blockSocket.Subscribe("rawblock");
 0287
 288            // // Subscribe to new transactions (for mempool monitoring)
 0289            // _transactionSocket = new SubscriberSocket();
 290            // _transactionSocket.Connect($"tcp://{_bitcoinOptions.ZmqHost}:{_bitcoinOptions.ZmqTxPort}");
 0291            // _transactionSocket.Subscribe("rawtx");
 0292
 32293            if (_logger.IsEnabled(LogLevel.Information))
 0294                _logger.LogInformation("ZMQ sockets initialized - Block: {BlockPort}, Tx: {TxPort}",
 0295                                       _bitcoinOptions.ZmqBlockPort, _bitcoinOptions.ZmqTxPort);
 32296        }
 0297        catch (Exception ex)
 298        {
 0299            _logger.LogError(ex, "Failed to initialize ZMQ sockets");
 0300            CleanupZmqSockets();
 0301            throw;
 0302        }
 32303    }
 304
 305    private void CleanupZmqSockets()
 0306    {
 0307        try
 308        {
 8309            _blockSocket?.Dispose();
 8310            _blockSocket = null;
 311
 0312            // _transactionSocket?.Dispose();
 313            // _transactionSocket = null;
 0314
 8315            _logger.LogDebug("ZMQ sockets cleaned up");
 8316        }
 0317        catch (Exception ex)
 0318        {
 0319            _logger.LogError(ex, "Error cleaning up ZMQ sockets");
 0320        }
 8321    }
 322
 323    private async Task ProcessPendingBlocksAsync(IUnitOfWork uow)
 324    {
 0325        try
 326        {
 32327            await _blockBacklogSemaphore.WaitAsync();
 328
 796329            while (_blocksToProcess.Count > 0)
 330            {
 764331                var blockKvp = _blocksToProcess.First();
 764332                if (blockKvp.Key <= _lastProcessedBlockHeight)
 28333                    _logger.LogWarning("Possible reorg detected: Block {Height} is already processed.", blockKvp.Key);
 0334
 764335                ProcessBlock(blockKvp.Value, blockKvp.Key, uow);
 0336            }
 32337        }
 338        finally
 339        {
 32340            _blockBacklogSemaphore.Release();
 341        }
 32342    }
 0343
 344    private async Task AddMissingBlocksToProcessAsync(uint currentHeight)
 345    {
 32346        var lastProcessedHeight = _lastProcessedBlockHeight + 1;
 32347        if (currentHeight > lastProcessedHeight)
 348        {
 28349            _logger.LogWarning("Processing missed blocks from height {LastProcessedHeight} to {CurrentHeight}",
 28350                               lastProcessedHeight, currentHeight);
 0351
 1520352            for (var height = lastProcessedHeight; height < currentHeight; height++)
 0353            {
 732354                if (_blocksToProcess.ContainsKey(height))
 355                    continue;
 0356
 0357                // Add the missing block to the process queue
 732358                var blockAtHeight = await _bitcoinChainService.GetBlockAsync(height);
 732359                if (blockAtHeight is not null)
 360                {
 732361                    _blocksToProcess[height] = blockAtHeight;
 362                }
 363                else
 364                {
 0365                    _logger.LogError("Missing block at height {Height}", height);
 366                }
 367            }
 368        }
 32369    }
 370
 371    private async Task ProcessNewBlock(Block block, uint currentHeight)
 372    {
 4373        using var scope = _serviceProvider.CreateScope();
 4374        using var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
 375
 4376        var blockHash = block.GetHash();
 377
 0378        try
 379        {
 4380            if (_logger.IsEnabled(LogLevel.Debug))
 0381                _logger.LogDebug("Processing block at height {blockHeight}: {BlockHash}", currentHeight, blockHash);
 382
 383            // Check for missed blocks first
 4384            await AddMissingBlocksToProcessAsync(currentHeight);
 385
 386            // Store the current block for processing
 4387            _blocksToProcess[currentHeight] = block;
 0388
 389            // Process missing blocks
 4390            await ProcessPendingBlocksAsync(uow);
 4391        }
 0392        catch (Exception ex)
 0393        {
 0394            _logger.LogError(ex, "Error processing new block {BlockHash}", blockHash);
 0395        }
 0396
 4397        await uow.SaveChangesAsync();
 4398    }
 399
 0400    // TODO: Check for revocation transactions in mempool
 0401    // private async Task ProcessNewTransaction(byte[] rawTxBytes)
 0402    // {
 403    //     try
 404    //     {
 405    //         var transaction = Transaction.Load(rawTxBytes, Network.Main);
 0406    //     }
 0407    //     catch (Exception ex)
 0408    //     {
 409    //         _logger.LogError(ex, "Error processing new transaction from mempool");
 0410    //     }
 0411    // }
 0412
 0413    private void ProcessBlock(Block block, uint height, IUnitOfWork uow)
 414    {
 0415        try
 0416        {
 764417            var blockHash = block.GetHash();
 418
 764419            if (_logger.IsEnabled(LogLevel.Debug))
 0420                _logger.LogDebug("Processing block {Height} with {TxCount} transactions", height,
 0421                                 block.Transactions.Count);
 0422
 0423            // Notify listeners of the new block
 764424            OnNewBlockDetected?.Invoke(this, new NewBlockEventArgs(height, blockHash.ToBytes()));
 0425
 0426            // Check if watched transactions are included in this block
 764427            CheckBlockForWatchedTransactions(block.Transactions, height, uow);
 0428
 429            // Check for deposits in this block
 764430            CheckBlockForWalletMovement(block.Transactions, height, uow);
 431
 432            // Update blockchain state
 764433            _blockchainState.UpdateState(blockHash.ToBytes(), height);
 764434            uow.BlockchainStateDbRepository.Update(_blockchainState);
 435
 764436            _blocksToProcess.Remove(height);
 437
 0438            // Update our internal state
 764439            _lastProcessedBlockHeight = height;
 440
 0441            // Check watched for all transactions' depth
 764442            CheckWatchedTransactionsDepth(uow);
 764443        }
 0444        catch (Exception ex)
 445        {
 0446            _logger.LogError(ex, "Error processing block at height {Height}", height);
 0447        }
 764448    }
 449
 0450    private void ConfirmTransaction(uint blockHeight, IUnitOfWork uow, WatchedTransactionModel watchedTransaction)
 0451    {
 4452        if (_logger.IsEnabled(LogLevel.Information))
 0453            _logger.LogInformation(
 0454                "Transaction {TxId} reached required depth of {depth} confirmations at block {blockHeight}",
 0455                watchedTransaction.TransactionId, watchedTransaction.RequiredDepth, blockHeight);
 456
 4457        watchedTransaction.MarkAsCompleted();
 4458        uow.WatchedTransactionDbRepository.Update(watchedTransaction);
 4459        OnTransactionConfirmed?.Invoke(
 4460            this, new TransactionConfirmedEventArgs(watchedTransaction, blockHeight));
 0461
 4462        _watchedTransactions.TryRemove(new uint256(watchedTransaction.TransactionId), out _);
 4463    }
 0464
 0465    private void CheckBlockForWatchedTransactions(List<Transaction> blockTransactions, uint blockHeight,
 466                                                  IUnitOfWork uow)
 0467    {
 764468        if (_logger.IsEnabled(LogLevel.Debug))
 0469            _logger.LogDebug(
 0470                "Checking {watchedTransactionCount} watched transactions for block {height} with {TxCount} transactions"
 0471                _watchedTransactions.Count, blockHeight, blockTransactions.Count);
 472
 764473        ushort index = 0;
 1528474        foreach (var transaction in blockTransactions)
 475        {
 0476            var txId = transaction.GetHash();
 0477
 0478            if (!_watchedTransactions.TryGetValue(txId, out var watchedTransaction))
 0479                continue;
 480
 0481            _logger.LogInformation("Transaction {TxId} found in block at height {Height}", txId, blockHeight);
 482
 483            try
 484            {
 485                // Update first seen height
 0486                watchedTransaction.SetHeightAndIndex(blockHeight, index);
 0487                uow.WatchedTransactionDbRepository.Update(watchedTransaction);
 488
 0489                if (watchedTransaction.RequiredDepth == 0)
 0490                    ConfirmTransaction(blockHeight, uow, watchedTransaction);
 0491            }
 0492            catch (Exception ex)
 493            {
 0494                _logger.LogError(ex, "Error checking confirmations for transaction {TxId}", txId);
 0495            }
 496            finally
 497            {
 0498                index++;
 0499            }
 500        }
 764501    }
 502
 503    private void CheckBlockForWalletMovement(List<Transaction> transactions, uint blockHeight, IUnitOfWork uow)
 504    {
 764505        if (_watchedAddresses.IsEmpty)
 764506            return;
 507
 0508        if (_logger.IsEnabled(LogLevel.Debug))
 0509            _logger.LogDebug("Checking {AddressCount} watched addresses for deposits/spends in block {Height}",
 0510                             _watchedAddresses.Count, blockHeight);
 511
 0512        foreach (var transaction in transactions)
 513        {
 0514            var txId = transaction.GetHash();
 515
 516            // Check each output for deposits
 0517            for (var i = 0; i < transaction.Outputs.Count; i++)
 518            {
 0519                var output = transaction.Outputs[i];
 0520                var destinationAddress = output.ScriptPubKey.GetDestinationAddress(_network);
 0521                if (destinationAddress == null)
 522                    continue;
 523
 0524                if (!_watchedAddresses.TryGetValue(destinationAddress.ToString(), out var watchedAddress))
 525                    continue;
 526
 0527                if (_logger.IsEnabled(LogLevel.Information))
 0528                    _logger.LogInformation(
 0529                        "Deposit detected: {amount} to address {destinationAddress} in tx {txId} at block {height}",
 0530                        output.Value, destinationAddress, txId, blockHeight);
 531
 532                // Save Utxo to the database
 0533                var utxo = new UtxoModel(txId.ToBytes(), (uint)i, LightningMoney.Satoshis(output.Value.Satoshi),
 0534                                         blockHeight, watchedAddress);
 0535                uow.AddUtxo(utxo);
 536
 0537                if (!_watchedAddresses.TryRemove(destinationAddress.ToString(), out _))
 0538                    _logger.LogError("Unable to remove watched address {DestinationAddress} from the list",
 0539                                     destinationAddress);
 540
 0541                OnWalletMovementDetected
 0542                  ?.Invoke(this, new WalletMovementEventArgs(destinationAddress.ToString(),
 0543                                                             LightningMoney.Satoshis(output.Value.Satoshi),
 0544                                                             txId.ToBytes(),
 0545                                                             blockHeight));
 546            }
 547
 548            // Check each input for spent utxos
 0549            foreach (var input in transaction.Inputs)
 0550                uow.TrySpendUtxo(new TxId(input.PrevOut.Hash.ToBytes()), input.PrevOut.N);
 551        }
 0552    }
 553
 554    private void CheckWatchedTransactionsDepth(IUnitOfWork uow)
 555    {
 1624556        foreach (var (txId, watchedTransaction) in _watchedTransactions)
 557        {
 558            try
 559            {
 560                // The FirstSeenAtHeight represents 1 confirmation, so we have to add 1
 44561                var confirmations = _lastProcessedBlockHeight - watchedTransaction.FirstSeenAtHeight + 1;
 44562                if (confirmations >= watchedTransaction.RequiredDepth)
 4563                    ConfirmTransaction(_lastProcessedBlockHeight, uow, watchedTransaction);
 44564            }
 0565            catch (Exception ex)
 566            {
 0567                _logger.LogError(ex, "Error checking confirmations for transaction {TxId}", txId);
 0568            }
 569        }
 768570    }
 571
 572    private async Task LoadPendingWatchedTransactionsAsync(IUnitOfWork uow)
 573    {
 32574        _logger.LogInformation("Loading watched transactions from database");
 575
 32576        var watchedTransactions = await uow.WatchedTransactionDbRepository.GetAllPendingAsync();
 72577        foreach (var watchedTransaction in watchedTransactions)
 578        {
 4579            _watchedTransactions[new uint256(watchedTransaction.TransactionId)] = watchedTransaction;
 580        }
 32581    }
 582
 583    private void LoadBitcoinAddresses(IUnitOfWork uow)
 584    {
 32585        _logger.LogInformation("Loading bitcoin addresses from database");
 586
 32587        var bitcoinAddresses = uow.WalletAddressesDbRepository.GetAllAddresses();
 64588        foreach (var bitcoinAddress in bitcoinAddresses)
 589        {
 0590            _watchedAddresses[bitcoinAddress.Address] = bitcoinAddress;
 591        }
 32592    }
 593
 594    private async Task LoadUtxoSetAsync(IUnitOfWork uow)
 595    {
 32596        _logger.LogInformation("Loading Utxo set");
 597
 32598        var utxoSet = (await uow.UtxoDbRepository.GetUnspentAsync()).ToList();
 32599        if (utxoSet.Count > 0)
 600        {
 0601            var utxoMemoryRepository = _serviceProvider.GetService<IUtxoMemoryRepository>() ??
 0602                                       throw new InvalidOperationException(
 0603                                           $"Error getting required service {nameof(IUtxoMemoryRepository)}");
 0604            utxoMemoryRepository.Load(utxoSet);
 605        }
 32606    }
 607}

Methods/Properties

.ctor(Microsoft.Extensions.Options.IOptions`1<NLightning.Infrastructure.Bitcoin.Options.BitcoinOptions>,NLightning.Infrastructure.Bitcoin.Wallet.Interfaces.IBitcoinWallet,Microsoft.Extensions.Logging.ILogger`1<NLightning.Infrastructure.Bitcoin.Wallet.BlockchainMonitorService>,Microsoft.Extensions.Options.IOptions`1<NLightning.Domain.Node.Options.NodeOptions>,System.IServiceProvider)
.ctor(Microsoft.Extensions.Options.IOptions`1<NLightning.Infrastructure.Bitcoin.Options.BitcoinOptions>,NLightning.Infrastructure.Bitcoin.Wallet.Interfaces.IBitcoinChainService,Microsoft.Extensions.Logging.ILogger`1<NLightning.Infrastructure.Bitcoin.Wallet.BlockchainMonitorService>,Microsoft.Extensions.Options.IOptions`1<NLightning.Domain.Node.Options.NodeOptions>,System.IServiceProvider)
get_LastProcessedBlockHeight()
StartAsync()
StartAsync()
StopAsync()
StopAsync()
WatchTransactionAsync()
PublishAndWatchTransactionAsync()
WatchTransactionAsync()
MonitorBlockchainAsync()
WatchBitcoinAddress(NLightning.Domain.Bitcoin.Wallet.Models.WalletAddressModel)
MonitorBlockchainAsync()
InitializeZmqSockets()
CleanupZmqSockets()
InitializeZmqSockets()
ProcessPendingBlocksAsync()
AddMissingBlocksToProcessAsync()
CleanupZmqSockets()
ProcessPendingBlocksAsync()
ProcessNewBlock()
AddMissingBlocksToProcessAsync()
ProcessNewBlock()
ProcessBlock(NBitcoin.Block,System.UInt32,NLightning.Domain.Persistence.Interfaces.IUnitOfWork)
ConfirmTransaction(System.UInt32,NLightning.Domain.Persistence.Interfaces.IUnitOfWork,NLightning.Domain.Bitcoin.Transactions.Models.WatchedTransactionModel)
ProcessBlock(NBitcoin.Block,System.UInt32,NLightning.Domain.Persistence.Interfaces.IUnitOfWork)
CheckWatchedTransactionsForBlock(System.Collections.Generic.List`1<NBitcoin.Transaction>,System.UInt32,NLightning.Domain.Persistence.Interfaces.IUnitOfWork)
ConfirmTransaction(System.UInt32,NLightning.Domain.Persistence.Interfaces.IUnitOfWork,NLightning.Domain.Bitcoin.Transactions.Models.WatchedTransactionModel)
CheckWatchedTransactionsDepth(NLightning.Domain.Persistence.Interfaces.IUnitOfWork)
CheckBlockForWatchedTransactions(System.Collections.Generic.List`1<NBitcoin.Transaction>,System.UInt32,NLightning.Domain.Persistence.Interfaces.IUnitOfWork)
LoadPendingWatchedTransactionsAsync()
CheckBlockForWalletMovement(System.Collections.Generic.List`1<NBitcoin.Transaction>,System.UInt32,NLightning.Domain.Persistence.Interfaces.IUnitOfWork)
CheckWatchedTransactionsDepth(NLightning.Domain.Persistence.Interfaces.IUnitOfWork)
LoadPendingWatchedTransactionsAsync()
LoadBitcoinAddresses(NLightning.Domain.Persistence.Interfaces.IUnitOfWork)
LoadUtxoSetAsync()