Refactor the connection manager + sync

This commit is contained in:
Kukks 2024-07-30 15:57:17 +02:00
parent f06cf66e81
commit b4dea11bc6
No known key found for this signature in database
GPG Key ID: 8E5530D9D1C93097
18 changed files with 517 additions and 445 deletions

View File

@ -1,16 +1,10 @@
using System.Net;
using System.Net.Http.Headers;
using BTCPayApp.CommonServer;
using BTCPayApp.Core.Auth;
using BTCPayApp.Core.Contracts;
using BTCPayApp.Core.Data;
using BTCPayApp.Core.Helpers;
using BTCPayApp.VSS;
using Microsoft.AspNetCore.Components.Authorization;
using Microsoft.AspNetCore.DataProtection;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@ -19,47 +13,30 @@ using TypedSignalR.Client;
namespace BTCPayApp.Core.Attempt2;
public static class ConfigHelpers
{
public static async Task<T> GetOrSet<T>(this ISecureConfigProvider secureConfigProvider, string key, Func<Task<T>> factory)
{
var value = await secureConfigProvider.Get<T>(key);
if (value is null)
{
value = await factory();
await secureConfigProvider.Set(key, value);
}
return value;
}
}
public class BTCPayConnectionManager : IHostedService, IHubConnectionObserver
{
private readonly IDbContextFactory<AppDbContext> _dbContextFactory;
private const string ConfigDeviceIdentifierKey = "deviceIdentifier";
private readonly IAccountManager _accountManager;
private readonly AuthenticationStateProvider _authStateProvider;
private readonly ILogger<BTCPayConnectionManager> _logger;
private readonly BTCPayAppServerClient _btcPayAppServerClient;
private readonly IBTCPayAppHubClient _btcPayAppServerClientInterface;
private readonly IHttpClientFactory _httpClientFactory;
private readonly ISecureConfigProvider _secureConfigProvider;
private readonly IConfigProvider _configProvider;
private readonly SyncService _syncService;
private IDisposable? _subscription;
public IBTCPayAppHubServer? HubProxy { get; private set; }
public HubConnection? Connection { get; private set; }
private HubConnection? Connection { get; set; }
public Network? ReportedNetwork { get; private set; }
public string ReportedNodeInfo { get; set; }
public event AsyncEventHandler<(HubConnectionState Old, HubConnectionState New)>? ConnectionChanged;
private HubConnectionState _connectionState = HubConnectionState.Disconnected;
public event AsyncEventHandler<(BTCPayConnectionState Old, BTCPayConnectionState New)>? ConnectionChanged;
private BTCPayConnectionState _connectionState = BTCPayConnectionState.Init;
public HubConnectionState ConnectionState
public BTCPayConnectionState ConnectionState
{
get => Connection?.State ?? HubConnectionState.Disconnected;
get => _connectionState;
private set
{
if (_connectionState == value)
@ -72,53 +49,127 @@ public class BTCPayConnectionManager : IHostedService, IHubConnectionObserver
}
public BTCPayConnectionManager(
IDbContextFactory<AppDbContext> dbContextFactory,
IAccountManager accountManager,
AuthenticationStateProvider authStateProvider,
ILogger<BTCPayConnectionManager> logger,
BTCPayAppServerClient btcPayAppServerClient,
IBTCPayAppHubClient btcPayAppServerClientInterface,
IHttpClientFactory httpClientFactory,
ISecureConfigProvider secureConfigProvider)
IConfigProvider configProvider,
SyncService syncService)
{
_dbContextFactory = dbContextFactory;
_accountManager = accountManager;
_authStateProvider = authStateProvider;
_logger = logger;
_btcPayAppServerClient = btcPayAppServerClient;
_btcPayAppServerClientInterface = btcPayAppServerClientInterface;
_httpClientFactory = httpClientFactory;
_secureConfigProvider = secureConfigProvider;
_configProvider = configProvider;
_syncService = syncService;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
ConnectionChanged += OnConnectionChanged;
_authStateProvider.AuthenticationStateChanged += OnAuthenticationStateChanged;
_btcPayAppServerClient.OnNotifyNetwork += OnNotifyNetwork;
_btcPayAppServerClient.OnNotifyServerEvent += OnNotifyServerEvent;
_btcPayAppServerClient.OnServerNodeInfo += OnServerNodeInfo;
await StartOrReplace();
_ = TryStayConnected();
}
private async Task<IDataProtector> GetDataProtector()
{
var key = await _secureConfigProvider.GetOrSet("encryptionKey", async () => Convert.ToHexString(RandomUtils.GetBytes(32)).ToLowerInvariant());
return new SingleKeyDataProtector(Convert.FromHexString(key));
await OnConnectionChanged(this, (BTCPayConnectionState.Init, BTCPayConnectionState.Init));
}
public async Task<IVSSAPI> GetVSSAPI()
private async Task<long> GetDeviceIdentifier()
{
if (Connection is null)
throw new InvalidOperationException("Connection is not established");
var vssUri = new Uri(new Uri(_accountManager.GetAccount().BaseUri), "vss/");
var httpClient = _httpClientFactory.CreateClient("vss");
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", _accountManager.GetAccount().AccessToken);
var vssClient = new HttpVSSAPIClient(vssUri, httpClient);
var protector = await GetDataProtector();
return new VSSApiEncryptorClient(vssClient, protector);
return await _configProvider.GetOrSet(ConfigDeviceIdentifierKey,
async () => RandomUtils.GetInt64(), false);
}
private async Task OnConnectionChanged(object? sender, (BTCPayConnectionState Old, BTCPayConnectionState New) e)
{
var account = _accountManager.GetAccount();
switch (e.New)
{
case BTCPayConnectionState.Init:
ConnectionState = BTCPayConnectionState.WaitingForAuth;
break;
case BTCPayConnectionState.WaitingForAuth:
await Kill();
if (account is not null)
{
ConnectionState = BTCPayConnectionState.Connecting;
}
break;
case BTCPayConnectionState.Connecting:
if (account is null)
{
ConnectionState = BTCPayConnectionState.WaitingForAuth;
break;
}
if (Connection is null)
{
Connection = new HubConnectionBuilder()
.AddNewtonsoftJsonProtocol(options =>
{
NBitcoin.JsonConverters.Serializer.RegisterFrontConverters(
options.PayloadSerializerSettings);
})
.WithUrl(new Uri(new Uri(account.BaseUri), "hub/btcpayapp").ToString(),
options =>
{
options.AccessTokenProvider = () =>
Task.FromResult(_accountManager.GetAccount()?.AccessToken);
})
.WithAutomaticReconnect()
.Build();
_subscription = Connection.Register(_btcPayAppServerClientInterface);
HubProxy = Connection.CreateHubProxy<IBTCPayAppHubServer>();
}
if (Connection.State == HubConnectionState.Disconnected)
{
try
{
await Connection.StartAsync();
if (Connection.State == HubConnectionState.Connected)
{
ConnectionState = BTCPayConnectionState.Syncing;
}
}
catch (HttpRequestException ex) when (ex.StatusCode is HttpStatusCode.Unauthorized)
{
await _accountManager.RefreshAccess();
ConnectionState = BTCPayConnectionState.WaitingForAuth;
}
}
break;
case BTCPayConnectionState.Syncing:
await _syncService.SyncToLocal();
ConnectionState = BTCPayConnectionState.ConnectedFinishedInitialSync;
break;
case BTCPayConnectionState.ConnectedFinishedInitialSync:
var deviceIdentifier = await GetDeviceIdentifier();
var master = await HubProxy.DeviceMasterSignal(deviceIdentifier, true);
ConnectionState =
master ? BTCPayConnectionState.ConnectedAsMaster : BTCPayConnectionState.ConnectedAsSlave;
break;
case BTCPayConnectionState.ConnectedAsMaster:
await _syncService.StartSync(false, await GetDeviceIdentifier());
break;
case BTCPayConnectionState.ConnectedAsSlave:
await _syncService.StartSync(true, await GetDeviceIdentifier());
break;
case BTCPayConnectionState.Disconnected:
ConnectionState = BTCPayConnectionState.WaitingForAuth;
break;
}
}
private async Task OnServerNodeInfo(object? sender, string e)
{
ReportedNodeInfo = e;
@ -140,10 +191,8 @@ public class BTCPayConnectionManager : IHostedService, IHubConnectionObserver
{
await task;
var authenticated = await _accountManager.CheckAuthenticated();
if (!authenticated)
await Kill();
else
await StartOrReplace();
await Kill();
ConnectionState = !authenticated ? BTCPayConnectionState.WaitingForAuth : BTCPayConnectionState.Connecting;
}
catch (Exception e)
{
@ -151,102 +200,55 @@ public class BTCPayConnectionManager : IHostedService, IHubConnectionObserver
}
}
private async Task MarkConnected()
{
// await new RemoteToLocalSyncService(_dbContextFactory,this).Sync();
ConnectionState = HubConnectionState.Connected;
}
private async Task TryStayConnected()
{
while (true)
{
try
{
if (Connection is not null && ConnectionState == HubConnectionState.Disconnected)
{
await Connection.StartAsync();
await MarkConnected();
}
else
{
await Task.Delay(5000);
}
}
catch (HttpRequestException ex) when (ex.StatusCode is HttpStatusCode.Unauthorized)
{
var result = await _accountManager.RefreshAccess();
if (result.Succeeded)
await StartOrReplace();
else
await Kill();
await Task.Delay(1000);
}
catch (Exception e)
{
await Task.Delay(1000);
}
}
}
private async Task Kill()
{
if (Connection is not null)
await Connection.StopAsync();
var conn = Connection;
Connection = null;
ConnectionState = HubConnectionState.Disconnected;
if (conn is not null)
await conn.StopAsync();
_subscription?.Dispose();
_subscription = null;
HubProxy = null;
await _syncService.StopSync();
}
private async Task StartOrReplace()
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_connectionState == BTCPayConnectionState.ConnectedAsMaster)
{
_logger.LogInformation("Sending device master signal to turn off");
var deviceIdentifier = await GetDeviceIdentifier();
await HubProxy.DeviceMasterSignal(deviceIdentifier, true);
}
await Kill();
var account = _accountManager.GetAccount();
if (account is null)
return;
Connection = new HubConnectionBuilder()
.AddNewtonsoftJsonProtocol(options =>
{
NBitcoin.JsonConverters.Serializer.RegisterFrontConverters(options.PayloadSerializerSettings);
})
.WithUrl(new Uri(new Uri(account.BaseUri), "hub/btcpayapp").ToString(), options =>
{
options.AccessTokenProvider = () => Task.FromResult(_accountManager.GetAccount()?.AccessToken);
})
.WithAutomaticReconnect()
.Build();
_subscription = Connection.Register(_btcPayAppServerClientInterface);
HubProxy = Connection.CreateHubProxy<IBTCPayAppHubServer>();
}
public Task StopAsync(CancellationToken cancellationToken)
{
_authStateProvider.AuthenticationStateChanged -= OnAuthenticationStateChanged;
_btcPayAppServerClient.OnNotifyNetwork += OnNotifyNetwork;
return Task.CompletedTask;
ConnectionChanged -= OnConnectionChanged;
}
public Task OnClosed(Exception? exception)
{
_logger.LogError(exception, "Hub connection closed");
ConnectionState = HubConnectionState.Disconnected;
if (Connection?.State == HubConnectionState.Disconnected)
{
ConnectionState = BTCPayConnectionState.Disconnected;
}
return Task.CompletedTask;
}
public async Task OnReconnected(string? connectionId)
{
_logger.LogInformation("Hub reconnected: {ConnectionId}", connectionId);
await MarkConnected();
_logger.LogInformation("Hub connection reconnected");
ConnectionState = BTCPayConnectionState.Syncing;
}
public Task OnReconnecting(Exception? exception)
public async Task OnReconnecting(Exception? exception)
{
_logger.LogWarning(exception, "Hub reconnecting");
ConnectionState = HubConnectionState.Connecting;
return Task.CompletedTask;
_logger.LogWarning(exception, "Hub connection reconnecting");
ConnectionState = BTCPayConnectionState.Connecting;
}
}
}

View File

@ -0,0 +1,13 @@
namespace BTCPayApp.Core.Attempt2;
public enum BTCPayConnectionState
{
Init,
WaitingForAuth,
Connecting,
Syncing,
Disconnected,
ConnectedAsMaster,
ConnectedAsSlave,
ConnectedFinishedInitialSync
}

View File

@ -0,0 +1,32 @@
using BTCPayApp.Core.Contracts;
namespace BTCPayApp.Core.Attempt2;
public static class ConfigHelpers
{
public static async Task<T> GetOrSet<T>(this ISecureConfigProvider secureConfigProvider, string key,
Func<Task<T>> factory)
{
var value = await secureConfigProvider.Get<T>(key);
if (Equals(value, default(T)))
{
value = await factory();
await secureConfigProvider.Set(key, value);
}
return value;
}
public static async Task<T> GetOrSet<T>(this IConfigProvider configProvider, string key, Func<Task<T>> factory,
bool backup)
{
var value = await configProvider.Get<T>(key);
if (Equals(value, default(T)))
{
value = await factory();
await configProvider.Set(key, value, backup);
}
return value;
}
}

View File

@ -240,7 +240,7 @@ public partial class LDKNode : IAsyncDisposable, IHostedService, IDisposable
if (!exists)
return;
var identifier = _onChainWalletManager.WalletConfig.Derivations[WalletDerivation.LightningScripts].Identifier;
// var identifier = _onChainWalletManager.WalletConfig.Derivations[WalletDerivation.LightningScripts].Identifier;
_logger.LogInformation("Stopping LDKNode services");
@ -252,7 +252,7 @@ public partial class LDKNode : IAsyncDisposable, IHostedService, IDisposable
_logger.LogInformation($"Stopped {service.GetType().Name}");
}).ToArray();
await Task.WhenAll(tasks);
_ = _connectionManager.HubProxy.IdentifierActive(identifier, false).RunSync();
// _ = _connectionManager.HubProxy.DeviceMasterSignal(identifier, false).RunSync();
}

View File

@ -1,3 +1,4 @@
using BTCPayApp.Core.Auth;
using BTCPayApp.Core.Data;
using BTCPayApp.Core.Helpers;
using Microsoft.AspNetCore.SignalR.Client;
@ -11,6 +12,7 @@ public class LightningNodeManager : BaseHostedService
{
public const string PaymentMethodId = "BTC-LN";
private readonly IAccountManager _accountManager;
private readonly IDbContextFactory<AppDbContext> _dbContextFactory;
private readonly ILogger<LightningNodeManager> _logger;
private readonly OnChainWalletManager _onChainWalletManager;
@ -20,13 +22,12 @@ public class LightningNodeManager : BaseHostedService
private IServiceScope? _nodeScope;
public LDKNode? Node => _nodeScope?.ServiceProvider.GetService<LDKNode>();
private LightningNodeState _state = LightningNodeState.Init;
private bool IsHubConnected => _btcPayConnectionManager.ConnectionState is HubConnectionState.Connected;
private bool IsHubConnected => _btcPayConnectionManager.ConnectionState is BTCPayConnectionState.ConnectedAsMaster;
private bool IsOnchainConfigured => _onChainWalletManager.WalletConfig is not null;
private bool IsOnchainLightningDerivationConfigured => _onChainWalletManager.WalletConfig?.Derivations.ContainsKey(WalletDerivation.LightningScripts) is true;
public bool CanConfigureLightningNode => IsHubConnected && IsOnchainConfigured && !IsOnchainLightningDerivationConfigured && State == LightningNodeState.NotConfigured;
public string? ConnectionString => IsOnchainLightningDerivationConfigured
? $"type=app;group={_onChainWalletManager.WalletConfig!.Derivations[WalletDerivation.LightningScripts].Identifier}".ToLower()
: null;
public string? ConnectionString => IsOnchainLightningDerivationConfigured && _accountManager.GetUserInfo() is {} acc
? $"type=app;user={acc.UserId}": null;
public LightningNodeState State
{
@ -45,12 +46,14 @@ public class LightningNodeManager : BaseHostedService
public event AsyncEventHandler<(LightningNodeState Old, LightningNodeState New)>? StateChanged;
public LightningNodeManager(
IAccountManager accountManager,
IDbContextFactory<AppDbContext> dbContextFactory,
ILogger<LightningNodeManager> logger,
OnChainWalletManager onChainWalletManager,
BTCPayConnectionManager btcPayConnectionManager,
IServiceScopeFactory serviceScopeFactory)
{
_accountManager = accountManager;
_dbContextFactory = dbContextFactory;
_logger = logger;
_onChainWalletManager = onChainWalletManager;
@ -160,15 +163,16 @@ public class LightningNodeManager : BaseHostedService
}
}
private async Task OnConnectionChanged(object? sender, (HubConnectionState Old, HubConnectionState New) state)
private async Task OnConnectionChanged(object? sender, (BTCPayConnectionState Old, BTCPayConnectionState New) valueTuple)
{
if (IsHubConnected && State == LightningNodeState.WaitingForConnection)
switch (IsHubConnected)
{
State = LightningNodeState.Loading;
}
else if (_btcPayConnectionManager.ConnectionState == HubConnectionState.Disconnected && State is LightningNodeState.Loading or LightningNodeState.Loaded)
{
_ = StopNode();
case true when State == LightningNodeState.WaitingForConnection:
State = LightningNodeState.Loading;
break;
case true when State is LightningNodeState.Loading or LightningNodeState.Loaded:
_ = StopNode();
break;
}
}
@ -205,19 +209,7 @@ public class LightningNodeManager : BaseHostedService
newState = LightningNodeState.NotConfigured;
break;
}
var result = await _btcPayConnectionManager.HubProxy!
.IdentifierActive(_onChainWalletManager.WalletConfig!.Derivations[WalletDerivation.LightningScripts].Identifier, true)
.RunSync();
if (result)
{
await StartNode();
}
else
{
//TODO: Introduce a new state so that this node knows that another instance is active
newState = LightningNodeState.Error;
}
await StartNode();
break;
case LightningNodeState.NotConfigured:

View File

@ -73,7 +73,7 @@ public class OnChainWalletManager : BaseHostedService
}
}
private bool IsHubConnected => _btcPayConnectionManager.ConnectionState is HubConnectionState.Connected;
private bool IsHubConnected => _btcPayConnectionManager.ConnectionState is BTCPayConnectionState.ConnectedAsMaster;
public bool IsConfigured => WalletConfig is not null;
private async Task OnStateChanged(object? sender, (OnChainWalletState Old, OnChainWalletState New) e)
@ -176,7 +176,7 @@ public class OnChainWalletManager : BaseHostedService
}
}
private async Task ConnectionChanged(object? sender, (HubConnectionState Old, HubConnectionState New) _)
private async Task ConnectionChanged(object? sender, (BTCPayConnectionState Old, BTCPayConnectionState New) valueTuple)
{
DetermineState();
}

View File

@ -0,0 +1,289 @@
using System.Net.Http.Headers;
using System.Text.Json;
using BTCPayApp.Core.Auth;
using BTCPayApp.Core.Contracts;
using BTCPayApp.Core.Data;
using BTCPayApp.VSS;
using Google.Protobuf;
using Microsoft.AspNetCore.DataProtection;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using NBitcoin;
using VSSProto;
namespace BTCPayApp.Core.Attempt2;
public class SyncService
{
private readonly ILogger<SyncService> _logger;
private readonly IAccountManager _accountManager;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IDbContextFactory<AppDbContext> _dbContextFactory;
private readonly ISecureConfigProvider _secureConfigProvider;
public SyncService(
ILogger<SyncService> logger,
ISecureConfigProvider secureConfigProvider,
IAccountManager accountManager,
IHttpClientFactory httpClientFactory,
IDbContextFactory<AppDbContext> dbContextFactory)
{
_logger = logger;
_accountManager = accountManager;
_httpClientFactory = httpClientFactory;
_dbContextFactory = dbContextFactory;
_secureConfigProvider = secureConfigProvider;
}
private async Task<IDataProtector> GetDataProtector()
{
var key = await _secureConfigProvider.GetOrSet("encryptionKey",
async () => Convert.ToHexString(RandomUtils.GetBytes(32)).ToLowerInvariant());
return new SingleKeyDataProtector(Convert.FromHexString(key));
}
private async Task<IVSSAPI> GetVSSAPI()
{
var account = _accountManager.GetAccount();
if (account is null)
throw new InvalidOperationException("Account not found");
var vssUri = new Uri(new Uri(account.BaseUri), "vss/");
var httpClient = _httpClientFactory.CreateClient("vss");
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", account.AccessToken);
var vssClient = new HttpVSSAPIClient(vssUri, httpClient);
var protector = await GetDataProtector();
return new VSSApiEncryptorClient(vssClient, protector);
}
private async Task<KeyValue[]> CreateLocalVersions(AppDbContext dbContext)
{
var settings = dbContext.Settings.Where(setting => setting.Backup).Select(setting => new KeyValue()
{
Key = setting.EntityKey,
Version = setting.Version
});
var channels = dbContext.LightningChannels.Select(channel => new KeyValue()
{
Key = channel.EntityKey,
Version = channel.Version
});
var payments = dbContext.LightningPayments.Select(payment => new KeyValue()
{
Key = payment.EntityKey,
Version = payment.Version
});
return await settings.Concat(channels).Concat(payments).ToArrayAsync();
}
public async Task SyncToLocal(CancellationToken cancellationToken = default)
{
var backupApi = await GetVSSAPI();
await using var db = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
var localVersions = await CreateLocalVersions(db);
var remoteVersions = await backupApi.ListKeyVersionsAsync(new ListKeyVersionsRequest(), cancellationToken);
await db.Database.BeginTransactionAsync(cancellationToken);
try
{
var triggers = await db.Database
.SqlQuery<TriggerRecord>($"SELECT name, sql FROM sqlite_master WHERE type = 'trigger'")
.ToListAsync(cancellationToken: cancellationToken);
await db.Database.ExecuteSqlRawAsync(
string.Join("; ", triggers.Select(trigger => $"DROP TRIGGER IF EXISTS {trigger.name}")),
cancellationToken: cancellationToken);
// delete local versions that are not in remote
// delete local versions which are lower than remote
var toDelete = localVersions.Where(localVersion =>
remoteVersions.KeyVersions.All(remoteVersion => remoteVersion.Key != localVersion.Key)
|| remoteVersions.KeyVersions.All(remoteVersion =>
remoteVersion.Key == localVersion.Key && remoteVersion.Version > localVersion.Version)).ToArray();
var toUpsert = remoteVersions.KeyVersions.Where(remoteVersion => localVersions.All(localVersion =>
localVersion.Key != remoteVersion.Key || localVersion.Version < remoteVersion.Version));
foreach (var upsertItem in toUpsert)
{
if (upsertItem.Value is null)
{
var item = await backupApi.GetObjectAsync(new GetObjectRequest()
{
Key = upsertItem.Key,
}, cancellationToken);
upsertItem.MergeFrom(item.Value);
}
}
var settingsToDelete = toDelete.Where(key => key.Key.StartsWith("Setting_")).Select(key => key.Key);
var channelsToDelete = toDelete.Where(key => key.Key.StartsWith("Channel_")).Select(key => key.Key);
var paymentsToDelete = toDelete.Where(key => key.Key.StartsWith("Payment_")).Select(key => key.Key);
await db.Settings.Where(setting => settingsToDelete.Contains(setting.EntityKey))
.ExecuteDeleteAsync(cancellationToken: cancellationToken);
await db.LightningChannels.Where(channel => channelsToDelete.Contains(channel.EntityKey))
.ExecuteDeleteAsync(cancellationToken: cancellationToken);
await db.LightningPayments.Where(payment => paymentsToDelete.Contains(payment.EntityKey))
.ExecuteDeleteAsync(cancellationToken: cancellationToken);
// upsert the rest when needed
var settingsToUpsert = toUpsert.Where(key => key.Key.StartsWith("Setting_")).Select(setting => new Setting()
{
Key = setting.Key.Split('_')[1],
Value = setting.Value.ToByteArray(),
Version = setting.Version,
Backup = true
});
var channelsToUpsert = toUpsert.Where(key => key.Key.StartsWith("Channel_"))
.Select(value => JsonSerializer.Deserialize<Channel>(value.Value.ToStringUtf8())!);
var paymentsToUpsert = toUpsert.Where(key => key.Key.StartsWith("Payment_")).Select(value =>
JsonSerializer.Deserialize<AppLightningPayment>(value.Value.ToStringUtf8())!);
await db.Settings.UpsertRange(settingsToUpsert).On(setting => setting.EntityKey)
.RunAsync(cancellationToken);
await db.LightningChannels.UpsertRange(channelsToUpsert).On(channel => channel.EntityKey)
.RunAsync(cancellationToken);
await db.LightningPayments.UpsertRange(paymentsToUpsert).On(payment => payment.EntityKey)
.RunAsync(cancellationToken);
await db.Database.ExecuteSqlRawAsync(string.Join("; ", triggers.Select(record => record.sql)),
cancellationToken: cancellationToken);
await db.Database.CommitTransactionAsync(cancellationToken);
await db.SaveChangesAsync(cancellationToken);
}
catch (Exception e)
{
await db.Database.RollbackTransactionAsync(cancellationToken);
throw;
}
}
private async Task<KeyValue?> GetValue(AppDbContext dbContext, Outbox outbox)
{
switch (outbox.Entity)
{
case "Setting":
var setting = await dbContext.Settings.SingleOrDefaultAsync(setting1 =>
setting1.EntityKey == outbox.Key && setting1.Backup);
if (setting == null)
return null;
return new KeyValue()
{
Key = outbox.Key,
Value = ByteString.CopyFrom(setting.Value),
Version = setting.Version
};
case "Channel":
var channel = await dbContext.LightningChannels.Include(channel1 => channel1.Aliases)
.SingleOrDefaultAsync(channel1 => channel1.EntityKey == outbox.Key);
if (channel == null)
return null;
var val = JsonSerializer.SerializeToUtf8Bytes(channel);
return new KeyValue()
{
Key = outbox.Key,
Value = ByteString.CopyFrom(val),
Version = channel.Version
};
case "Payment":
var payment = await dbContext.LightningPayments.SingleOrDefaultAsync(lightningPayment =>
lightningPayment.EntityKey == outbox.Key);
if (payment == null)
return null;
var paymentBytes = JsonSerializer.SerializeToUtf8Bytes(payment);
return new KeyValue()
{
Key = outbox.Key,
Value = ByteString.CopyFrom(paymentBytes),
Version = payment.Version
};
default:
throw new ArgumentOutOfRangeException();
}
}
public async Task SyncToRemote(long deviceIdentifier, CancellationToken cancellationToken = default)
{
var backupAPi = await GetVSSAPI();
await using var db = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
var putObjectRequest = new PutObjectRequest();
var outbox = await db.OutboxItems.GroupBy(outbox1 => new {outbox1.Key})
.ToListAsync(cancellationToken: cancellationToken);
foreach (var outboxItemSet in outbox)
{
var orderedEnumerable = outboxItemSet.OrderByDescending(outbox1 => outbox1.Version)
.ThenByDescending(outbox1 => outbox1.ActionType).ToArray();
foreach (var item in orderedEnumerable)
{
if (item.ActionType == OutboxAction.Delete)
{
putObjectRequest.DeleteItems.Add(new KeyValue()
{
Key = item.Key, Version = item.Version
});
}
else
{
var kv = await GetValue(db, item);
if (kv != null)
{
putObjectRequest.TransactionItems.Add(kv);
break;
}
}
}
db.OutboxItems.RemoveRange(orderedEnumerable);
// Process outbox item
}
putObjectRequest.GlobalVersion = deviceIdentifier;
await backupAPi.PutObjectAsync(putObjectRequest, cancellationToken);
await db.SaveChangesAsync(cancellationToken);
}
private (Task syncTask, CancellationTokenSource cts, bool local)? _syncTask;
public async Task StartSync(bool local, long deviceIdentifier, CancellationToken cancellationToken = default)
{
if (_syncTask.HasValue && _syncTask.Value.local == local)
return;
if (_syncTask.HasValue)
{
await _syncTask.Value.cts.CancelAsync();
}
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_syncTask = (ContinuouslySync(deviceIdentifier,local, cts.Token), cts, local);
}
public async Task StopSync()
{
if (_syncTask.HasValue)
{
await _syncTask.Value.cts.CancelAsync();
_syncTask = null;
}
}
private async Task ContinuouslySync(long deviceIdentifier, bool local, CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
if (local)
await SyncToLocal(cancellationToken);
else
await SyncToRemote(deviceIdentifier, cancellationToken);
await Task.Delay(2000, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e, "Error while syncing to {Local}", local ? "local" : "remote");
}
}
}
}

View File

@ -46,22 +46,26 @@ public class AuthStateProvider : AuthenticationStateProvider, IAccountManager, I
_identityOptions = identityOptions;
}
private CancellationTokenSource? _pingCts;
public async Task StartAsync(CancellationToken cancellationToken)
{
_ = PingOccasionally();
_pingCts = new CancellationTokenSource();
_ = PingOccasionally(_pingCts.Token);
}
private async Task PingOccasionally()
private async Task PingOccasionally(CancellationToken pingCtsToken)
{
while (_userInfo != null)
while (pingCtsToken.IsCancellationRequested is false)
{
await GetAuthenticationStateAsync();
await Task.Delay(TimeSpan.FromSeconds(5));
await Task.Delay(TimeSpan.FromSeconds(5), pingCtsToken);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_pingCts?.Cancel();
return Task.CompletedTask;
}

View File

@ -1,176 +0,0 @@
using System.Text.Json;
using BTCPayApp.Core.Attempt2;
using BTCPayApp.Core.Helpers;
using Google.Protobuf;
using VSSProto;
using Microsoft.EntityFrameworkCore;
namespace BTCPayApp.Core.Data;
using System;
using System.Linq;
public class OutboxProcessor : IScopedHostedService
{
private readonly IDbContextFactory<AppDbContext> _dbContextFactory;
private readonly BTCPayConnectionManager _btcPayConnectionManager;
public OutboxProcessor(IDbContextFactory<AppDbContext> dbContextFactory,
BTCPayConnectionManager btcPayConnectionManager)
{
_dbContextFactory = dbContextFactory;
_btcPayConnectionManager = btcPayConnectionManager;
}
private async Task<KeyValue?> GetValue(AppDbContext dbContext, Outbox outbox)
{
switch (outbox.Entity)
{
case "Setting":
var setting = await dbContext.Settings.SingleOrDefaultAsync(setting1 => setting1.EntityKey == outbox.Key && setting1.Backup);
if (setting == null)
return null;
return new KeyValue()
{
Key = outbox.Key,
Value = ByteString.CopyFrom(setting.Value),
Version = setting.Version
};
case "Channel":
var channel = await dbContext.LightningChannels.Include(channel1 => channel1.Aliases)
.SingleOrDefaultAsync(channel1 => channel1.EntityKey == outbox.Key);
if (channel == null)
return null;
var val = JsonSerializer.SerializeToUtf8Bytes(channel);
return new KeyValue()
{
Key = outbox.Key,
Value = ByteString.CopyFrom(val),
Version = channel.Version
};
case "Payment":
var payment = await dbContext.LightningPayments .SingleOrDefaultAsync(lightningPayment => lightningPayment.EntityKey == outbox.Key);
if (payment == null)
return null;
var paymentBytes = JsonSerializer.SerializeToUtf8Bytes(payment);
return new KeyValue()
{
Key = outbox.Key,
Value = ByteString.CopyFrom(paymentBytes),
Version = payment.Version
};
default:
throw new ArgumentOutOfRangeException();
}
// switch (outbox.Entity)
// {
// case "Setting":
// var setting = await dbContext.Settings.FindAsync(outbox.Key);
// if (setting?.Backup is not true)
// return null;
// return new KeyValue()
// {
// Key = "Setting_" + outbox.Key,
// Value = ByteString.CopyFrom(setting.Value),
// Version = setting.Version
// };
// case "Channel":
// var channel = await dbContext.LightningChannels.Include(channel1 => channel1.Aliases)
// .SingleOrDefaultAsync(channel1 => channel1.Id == outbox.Key);
//
// if (channel == null)
// return null;
// var val = JsonSerializer.SerializeToUtf8Bytes(channel);
// return new KeyValue()
// {
// Key = "Channel_" + outbox.Key,
// Value = ByteString.CopyFrom(val),
// Version = channel.Version
// };
// case "Payment":
// var split = outbox.Key.Split('_');
// var paymentHash = uint256.Parse(split[0]);
// var paymentId = split[1];
// var inbound = bool.Parse(split[2]);
//
// var payment = await dbContext.LightningPayments.FindAsync(paymentHash, paymentId, inbound);
// if (payment == null)
// return null;
// var paymentBytes = JsonSerializer.SerializeToUtf8Bytes(payment);
// return new KeyValue()
// {
// Key = "Payment_" + outbox.Key,
// Value = ByteString.CopyFrom(paymentBytes),
// Version = payment.Version
// };
// default:
// throw new ArgumentOutOfRangeException();
// }
}
private async Task ProcessOutbox(CancellationToken cancellationToken = default)
{
var backupAPi = await _btcPayConnectionManager.GetVSSAPI();
await using var db = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
var putObjectRequest = new PutObjectRequest();
var outbox = await db.OutboxItems.GroupBy(outbox1 => new {outbox1.Key})
.ToListAsync(cancellationToken: cancellationToken);
foreach (var outboxItemSet in outbox)
{
var orderedEnumerable = outboxItemSet.OrderByDescending(outbox1 => outbox1.Version)
.ThenByDescending(outbox1 => outbox1.ActionType).ToArray();
foreach (var item in orderedEnumerable)
{
if (item.ActionType == OutboxAction.Delete)
{
putObjectRequest.DeleteItems.Add(new KeyValue()
{
Key = item.Key, Version = item.Version
});
}
else
{
var kv = await GetValue(db, item);
if (kv != null)
{
putObjectRequest.TransactionItems.Add(kv);
break;
}
}
}
db.OutboxItems.RemoveRange(orderedEnumerable);
// Process outbox item
}
await backupAPi.PutObjectAsync(putObjectRequest, cancellationToken);
await db.SaveChangesAsync(cancellationToken);
}
private CancellationTokenSource _cts = new();
public async Task StartAsync(CancellationToken cancellationToken)
{
_cts = new CancellationTokenSource();
_ = Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested)
{
await ProcessOutbox(_cts.Token);
await Task.Delay(2000, _cts.Token);
}
}, _cts.Token);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _cts.CancelAsync();
await ProcessOutbox(cancellationToken);
}
}

View File

@ -1,8 +1,5 @@
using System.Security.Cryptography.X509Certificates;
using System.Text.Json;
using System.Text.Json;
using BTCPayApp.Core.Attempt2;
using BTCPayApp.Core.Helpers;
using Microsoft.AspNetCore.DataProtection;
using Microsoft.EntityFrameworkCore;
using VSSProto;
@ -24,94 +21,10 @@ public class RemoteToLocalSyncService
_dbContextFactory = dbContextFactory;
_btcPayConnectionManager = btcPayConnectionManager;
}
// on connected to btcpay, sync all the data from the remote to the local
// if we are the active node
private async Task<KeyValue[]> CreateLocalVersions(AppDbContext dbContext)
{
var settings = dbContext.Settings.Where(setting => setting.Backup).Select(setting => new KeyValue()
{
Key = setting.EntityKey,
Version = setting.Version
});
var channels = dbContext.LightningChannels.Select(channel => new KeyValue()
{
Key = channel.EntityKey,
Version = channel.Version
});
var payments = dbContext.LightningPayments.Select(payment => new KeyValue()
{
Key = payment.EntityKey,
Version = payment.Version
});
return await settings.Concat(channels).Concat(payments).ToArrayAsync();
}
public async Task Sync()
{
var backupApi = await _btcPayConnectionManager.GetVSSAPI();
await using var db = await _dbContextFactory.CreateDbContextAsync();
var localVersions = await CreateLocalVersions(db);
var remoteVersions = await backupApi.ListKeyVersionsAsync(new ListKeyVersionsRequest());
await db.Database.BeginTransactionAsync();
try
{
var triggers = await db.Database.SqlQuery<TriggerRecord>($"SELECT name, sql FROM sqlite_master WHERE type = 'trigger'").ToListAsync();
await db.Database.ExecuteSqlRawAsync( string.Join("; ", triggers.Select(trigger => $"DROP TRIGGER IF EXISTS {trigger.name}")));
// delete local versions that are not in remote
// delete local versions which are lower than remote
var toDelete = localVersions.Where(localVersion =>
remoteVersions.KeyVersions.All(remoteVersion => remoteVersion.Key != localVersion.Key)
|| remoteVersions.KeyVersions.All(remoteVersion => remoteVersion.Key == localVersion.Key && remoteVersion.Version > localVersion.Version)).ToArray();
var toUpsert = remoteVersions.KeyVersions.Where(remoteVersion => localVersions.All(localVersion => localVersion.Key != remoteVersion.Key || localVersion.Version < remoteVersion.Version));
foreach (var upsertItem in toUpsert)
{
if(upsertItem.Value is null)
{
var item = await backupApi.GetObjectAsync(new GetObjectRequest()
{
Key = upsertItem.Key,
});
upsertItem.MergeFrom(item.Value);
}
}
var settingsToDelete = toDelete.Where(key => key.Key.StartsWith("Setting_")).Select(key => key.Key);
var channelsToDelete = toDelete.Where(key => key.Key.StartsWith("Channel_")).Select(key => key.Key);
var paymentsToDelete = toDelete.Where(key => key.Key.StartsWith("Payment_")).Select(key => key.Key);
await db.Settings.Where(setting => settingsToDelete.Contains(setting.EntityKey)).ExecuteDeleteAsync();
await db.LightningChannels.Where(channel => channelsToDelete.Contains(channel.EntityKey)).ExecuteDeleteAsync();
await db.LightningPayments.Where(payment => paymentsToDelete.Contains(payment.EntityKey)).ExecuteDeleteAsync();
// upsert the rest when needed
var settingsToUpsert = toUpsert.Where(key => key.Key.StartsWith("Setting_")).Select(setting=> new Setting()
{
Key = setting.Key.Split('_')[1],
Value = setting.Value.ToByteArray(),
Version = setting.Version,
Backup = true
});
var channelsToUpsert = toUpsert.Where(key => key.Key.StartsWith("Channel_")).Select(value => JsonSerializer.Deserialize<Channel>(value.Value.ToStringUtf8())!);
var paymentsToUpsert = toUpsert.Where(key => key.Key.StartsWith("Payment_")).Select(value => JsonSerializer.Deserialize<AppLightningPayment>(value.Value.ToStringUtf8())!);
await db.Settings.UpsertRange(settingsToUpsert).On(setting => setting.EntityKey).RunAsync();
await db.LightningChannels.UpsertRange(channelsToUpsert).On(channel => channel.EntityKey).RunAsync();
await db.LightningPayments.UpsertRange(paymentsToUpsert).On(payment => payment.EntityKey).RunAsync();
await db.Database.ExecuteSqlRawAsync(string.Join("; ", triggers.Select(record => record.sql)));
await db.Database.CommitTransactionAsync();
await db.SaveChangesAsync();
}
catch (Exception e)
{
await db.Database.RollbackTransactionAsync();
throw;
}
}
}

View File

@ -212,10 +212,8 @@ public static class LDKExtensions
services.AddScoped<PaymentsManager>();
services.AddScoped<BTCPayPaymentsNotifier>();
services.AddScoped<BTCPayPaymentsNotifier>();
services.AddScoped<OutboxProcessor>();
// services.AddScoped<IScopedHostedService>(provider =>
// provider.GetRequiredService<LDKSpendableOutputEventHandler>());
services.AddScoped<IScopedHostedService>(provider => provider.GetRequiredService<OutboxProcessor>());
services.AddScoped<IScopedHostedService>(provider => provider.GetRequiredService<LDKChannelSync>());
services.AddScoped<IScopedHostedService>(provider => provider.GetRequiredService<PaymentsManager>());
services.AddScoped<IScopedHostedService>(provider => provider.GetRequiredService<LDKBackgroundProcessor>());

View File

@ -29,6 +29,7 @@ public static class StartupExtensions
serviceCollection.AddHostedService<AppDatabaseMigrator>();
serviceCollection.AddHttpClient();
serviceCollection.AddSingleton<BTCPayConnectionManager>();
serviceCollection.AddSingleton<SyncService>();
serviceCollection.AddSingleton<LightningNodeManager>();
serviceCollection.AddSingleton<OnChainWalletManager>();
serviceCollection.AddSingleton<BTCPayAppServerClient>();

View File

@ -27,11 +27,15 @@
public SetupState SetupStateConnection() {
return State.Value.ConnectionState switch
{
HubConnectionState.Connected => SetupState.Completed,
HubConnectionState.Connecting => SetupState.Pending,
HubConnectionState.Reconnecting => SetupState.Pending,
HubConnectionState.Disconnected => SetupState.Pending,
_ => SetupState.Undetermined
BTCPayConnectionState.Init => SetupState.Pending,
BTCPayConnectionState.WaitingForAuth => SetupState.Pending,
BTCPayConnectionState.Connecting => SetupState.Pending,
BTCPayConnectionState.Syncing => SetupState.Pending,
BTCPayConnectionState.Disconnected => SetupState.Failed,
BTCPayConnectionState.ConnectedAsMaster => SetupState.Completed,
BTCPayConnectionState.ConnectedAsSlave => SetupState.Completed,
BTCPayConnectionState.ConnectedFinishedInitialSync => SetupState.Pending,
_ => SetupState.Undetermined
};
}

View File

@ -7,11 +7,11 @@ namespace BTCPayApp.UI.Features;
[FeatureState]
public record RootState
{
public HubConnectionState? ConnectionState;
public BTCPayConnectionState ConnectionState;
public OnChainWalletState? OnchainWalletState;
public LightningNodeState? LightningNodeState;
public record ConnectionStateUpdatedAction(HubConnectionState? State);
public record ConnectionStateUpdatedAction(BTCPayConnectionState State);
public record OnChainWalletStateUpdatedAction(OnChainWalletState State);
public record LightningNodeStateUpdatedAction(LightningNodeState State);

View File

@ -164,7 +164,7 @@
<ul class="list-group list-group-flush list-group-links">
<li class="list-group-item">
<div class="justify-content-start">
<span class="m-2 btcpay-status btcpay-status--@(State.Value.ConnectionState switch { HubConnectionState.Connected => "enabled", HubConnectionState.Disconnected => "disabled", _ => "pending" })"></span>
<span class="m-2 btcpay-status btcpay-status--@(State.Value.ConnectionState switch { BTCPayConnectionState.ConnectedAsMaster or BTCPayConnectionState.ConnectedAsSlave => "enabled", BTCPayConnectionState.Disconnected => "disabled", _ => "pending" })"></span>
<span>Hub: @State.Value.ConnectionState</span>
</div>
</li>

View File

@ -84,7 +84,7 @@
@code {
private bool CanConfigureWallet =>
State.Value.OnchainWalletState == OnChainWalletState.NotConfigured &&
State.Value.ConnectionState == HubConnectionState.Connected;
State.Value.ConnectionState == BTCPayConnectionState.ConnectedAsMaster;
private AppUserStoreInfo? Store => AccountManager.GetCurrentStore();
private string? _storePaymentMethodIdentifier;

View File

@ -101,7 +101,7 @@
private bool CanConfigureWallet =>
State.Value.OnchainWalletState == OnChainWalletState.NotConfigured &&
State.Value.ConnectionState == HubConnectionState.Connected;
State.Value.ConnectionState == BTCPayConnectionState.ConnectedAsMaster;
protected override async Task OnInitializedAsync()
{

@ -1 +1 @@
Subproject commit 0df173b415068e2117d0d67fc2a528b9b83afe40
Subproject commit 7bdb136c276f53a4c24b01d98f8bcb2de6f52254