Merge branch 'cold-and-shards'
This commit is contained in:
2
MareAPI
2
MareAPI
Submodule MareAPI updated: cd8934a4ab...b529a101ae
@@ -1,3 +1,3 @@
|
||||
namespace MareSynchronosServer.Authentication;
|
||||
|
||||
public record SecretKeyAuthReply(bool Success, string Uid, bool TempBan, bool Permaban);
|
||||
public record SecretKeyAuthReply(bool Success, string Uid, string Alias, bool TempBan, bool Permaban);
|
||||
|
||||
@@ -10,30 +10,24 @@ namespace MareSynchronosServer.Authentication;
|
||||
public class SecretKeyAuthenticatorService
|
||||
{
|
||||
private readonly MareMetrics _metrics;
|
||||
private readonly IServiceScopeFactory _serviceScopeFactory;
|
||||
private readonly MareDbContext _mareDbContext;
|
||||
private readonly IConfigurationService<MareConfigurationAuthBase> _configurationService;
|
||||
private readonly ILogger<SecretKeyAuthenticatorService> _logger;
|
||||
private readonly ConcurrentDictionary<string, SecretKeyAuthReply> _cachedPositiveResponses = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, SecretKeyFailedAuthorization> _failedAuthorizations = new(StringComparer.Ordinal);
|
||||
|
||||
public SecretKeyAuthenticatorService(MareMetrics metrics, IServiceScopeFactory serviceScopeFactory, IConfigurationService<MareConfigurationAuthBase> configuration, ILogger<SecretKeyAuthenticatorService> logger)
|
||||
public SecretKeyAuthenticatorService(MareMetrics metrics, MareDbContext mareDbContext,
|
||||
IConfigurationService<MareConfigurationAuthBase> configuration, ILogger<SecretKeyAuthenticatorService> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
_configurationService = configuration;
|
||||
_metrics = metrics;
|
||||
_serviceScopeFactory = serviceScopeFactory;
|
||||
_mareDbContext = mareDbContext;
|
||||
}
|
||||
|
||||
public async Task<SecretKeyAuthReply> AuthorizeAsync(string ip, string hashedSecretKey)
|
||||
{
|
||||
_metrics.IncCounter(MetricsAPI.CounterAuthenticationRequests);
|
||||
|
||||
if (_cachedPositiveResponses.TryGetValue(hashedSecretKey, out var cachedPositiveResponse))
|
||||
{
|
||||
_metrics.IncCounter(MetricsAPI.CounterAuthenticationCacheHits);
|
||||
return cachedPositiveResponse;
|
||||
}
|
||||
|
||||
if (_failedAuthorizations.TryGetValue(ip, out var existingFailedAuthorization)
|
||||
&& existingFailedAuthorization.FailedAttempts > _configurationService.GetValueOrDefault(nameof(MareConfigurationAuthBase.FailedAuthForTempBan), 5))
|
||||
{
|
||||
@@ -50,26 +44,17 @@ public class SecretKeyAuthenticatorService
|
||||
_failedAuthorizations.Remove(ip, out _);
|
||||
});
|
||||
}
|
||||
return new(Success: false, Uid: null, TempBan: true, Permaban: false);
|
||||
return new(Success: false, Uid: null, TempBan: true, Alias: null, Permaban: false);
|
||||
}
|
||||
|
||||
using var scope = _serviceScopeFactory.CreateScope();
|
||||
using var context = scope.ServiceProvider.GetService<MareDbContext>();
|
||||
var authReply = await context.Auth.AsNoTracking().SingleOrDefaultAsync(u => u.HashedKey == hashedSecretKey).ConfigureAwait(false);
|
||||
var authReply = await _mareDbContext.Auth.Include(a => a.User).AsNoTracking()
|
||||
.SingleOrDefaultAsync(u => u.HashedKey == hashedSecretKey).ConfigureAwait(false);
|
||||
|
||||
SecretKeyAuthReply reply = new(authReply != null, authReply?.UserUID, false, authReply?.IsBanned ?? false);
|
||||
SecretKeyAuthReply reply = new(authReply != null, authReply?.UserUID, authReply?.User?.Alias ?? string.Empty, TempBan: false, authReply?.IsBanned ?? false);
|
||||
|
||||
if (reply.Success)
|
||||
{
|
||||
_metrics.IncCounter(MetricsAPI.CounterAuthenticationSuccesses);
|
||||
|
||||
_cachedPositiveResponses[hashedSecretKey] = reply;
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMinutes(5)).ConfigureAwait(false);
|
||||
_cachedPositiveResponses.TryRemove(hashedSecretKey, out _);
|
||||
});
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -97,6 +82,6 @@ public class SecretKeyAuthenticatorService
|
||||
}
|
||||
}
|
||||
|
||||
return new(Success: false, Uid: null, TempBan: false, Permaban: false);
|
||||
return new(Success: false, Uid: null, Alias: null, TempBan: false, Permaban: false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronosServer.Authentication;
|
||||
using MareSynchronosServer.Services;
|
||||
using MareSynchronosShared;
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosShared.Models;
|
||||
@@ -23,6 +24,7 @@ public class JwtController : Controller
|
||||
private readonly IHttpContextAccessor _accessor;
|
||||
private readonly IRedisDatabase _redis;
|
||||
private readonly MareDbContext _mareDbContext;
|
||||
private readonly GeoIPService _geoIPProvider;
|
||||
private readonly SecretKeyAuthenticatorService _secretKeyAuthenticatorService;
|
||||
private readonly AccountRegistrationService _accountRegistrationService;
|
||||
private readonly IConfigurationService<MareConfigurationAuthBase> _configuration;
|
||||
@@ -31,10 +33,11 @@ public class JwtController : Controller
|
||||
SecretKeyAuthenticatorService secretKeyAuthenticatorService,
|
||||
AccountRegistrationService accountRegistrationService,
|
||||
IConfigurationService<MareConfigurationAuthBase> configuration,
|
||||
IRedisDatabase redisDb)
|
||||
IRedisDatabase redisDb, GeoIPService geoIPProvider)
|
||||
{
|
||||
_accessor = accessor;
|
||||
_redis = redisDb;
|
||||
_geoIPProvider = geoIPProvider;
|
||||
_mareDbContext = mareDbContext;
|
||||
_secretKeyAuthenticatorService = secretKeyAuthenticatorService;
|
||||
_accountRegistrationService = accountRegistrationService;
|
||||
@@ -66,7 +69,7 @@ public class JwtController : Controller
|
||||
}
|
||||
|
||||
if (!authResult.Success && !authResult.TempBan) return Unauthorized("The provided secret key is invalid. Verify your accounts existence and/or recover the secret key.");
|
||||
if (!authResult.Success && authResult.TempBan) return Unauthorized("You are temporarily banned. Try connecting again in 5 minutes.");
|
||||
if (!authResult.Success && authResult.TempBan) return Unauthorized("Due to an excessive amount of failed authentication attempts you are temporarily banned. Check your Secret Key configuration and try connecting again in 5 minutes.");
|
||||
if (authResult.Permaban)
|
||||
{
|
||||
if (!_mareDbContext.BannedUsers.Any(c => c.CharacterIdentification == charaIdent))
|
||||
@@ -112,6 +115,8 @@ public class JwtController : Controller
|
||||
{
|
||||
new Claim(MareClaimTypes.Uid, authResult.Uid),
|
||||
new Claim(MareClaimTypes.CharaIdent, charaIdent),
|
||||
new Claim(MareClaimTypes.Alias, authResult.Alias),
|
||||
new Claim(MareClaimTypes.Continent, await _geoIPProvider.GetCountryFromIP(_accessor)),
|
||||
});
|
||||
|
||||
return Content(token.RawData);
|
||||
@@ -140,3 +145,4 @@ public class JwtController : Controller
|
||||
return handler.CreateJwtSecurityToken(token);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronos.API.SignalR;
|
||||
using MareSynchronosServer.Hubs;
|
||||
using MareSynchronosServer.Services;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
|
||||
namespace MareSynchronosServer.Controllers;
|
||||
|
||||
[Route(MareFiles.Main)]
|
||||
public class MainController : Controller
|
||||
{
|
||||
private IHubContext<MareHub, IMareHub> _hubContext;
|
||||
|
||||
public MainController(ILogger<MainController> logger, IHubContext<MareHub, IMareHub> hubContext)
|
||||
{
|
||||
_hubContext = hubContext;
|
||||
}
|
||||
|
||||
[HttpGet(MareFiles.Main_SendReady)]
|
||||
[Authorize(Policy = "Internal")]
|
||||
public IActionResult SendReadyToClients(string uid, Guid requestId)
|
||||
{
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
await _hubContext.Clients.User(uid).Client_DownloadReady(requestId);
|
||||
});
|
||||
return Ok();
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,8 @@ public partial class MareHub
|
||||
|
||||
public string UserUID => Context.User?.Claims?.SingleOrDefault(c => string.Equals(c.Type, MareClaimTypes.Uid, StringComparison.Ordinal))?.Value ?? throw new Exception("No UID in Claims");
|
||||
|
||||
public string Continent => Context.User?.Claims?.SingleOrDefault(c => string.Equals(c.Type, MareClaimTypes.Continent, StringComparison.Ordinal))?.Value ?? "UNK";
|
||||
|
||||
private async Task DeleteUser(User user)
|
||||
{
|
||||
var ownPairData = await _dbContext.ClientPairs.Where(u => u.User.UID == user.UID).ToListAsync().ConfigureAwait(false);
|
||||
@@ -35,11 +37,6 @@ public partial class MareHub
|
||||
_dbContext.Remove(userProfileData);
|
||||
}
|
||||
|
||||
while (_dbContext.Files.Any(f => f.Uploader == user))
|
||||
{
|
||||
await Task.Delay(1000).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_dbContext.ClientPairs.RemoveRange(ownPairData);
|
||||
await _dbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
var otherPairData = await _dbContext.ClientPairs.Include(u => u.User)
|
||||
|
||||
@@ -93,7 +93,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
[Authorize(Policy = "Authenticated")]
|
||||
public override async Task OnConnectedAsync()
|
||||
{
|
||||
_mareMetrics.IncGauge(MetricsAPI.GaugeConnections);
|
||||
_mareMetrics.IncGaugeWithLabels(MetricsAPI.GaugeConnections, labels: Continent);
|
||||
|
||||
try
|
||||
{
|
||||
@@ -109,7 +109,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
[Authorize(Policy = "Authenticated")]
|
||||
public override async Task OnDisconnectedAsync(Exception exception)
|
||||
{
|
||||
_mareMetrics.DecGauge(MetricsAPI.GaugeConnections);
|
||||
_mareMetrics.DecGaugeWithLabels(MetricsAPI.GaugeConnections, labels: Continent);
|
||||
|
||||
try
|
||||
{
|
||||
@@ -120,9 +120,6 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
await RemoveUserFromRedis().ConfigureAwait(false);
|
||||
|
||||
await SendOfflineToAllPairedUsers().ConfigureAwait(false);
|
||||
|
||||
_dbContext.RemoveRange(_dbContext.Files.Where(f => !f.Uploaded && f.UploaderUID == UserUID));
|
||||
await _dbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
}
|
||||
catch { }
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
</PackageReference>
|
||||
<PackageReference Include="MaxMind.GeoIP2" Version="5.2.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting.Systemd" Version="8.0.0" />
|
||||
<PackageReference Include="Microsoft.IdentityModel.Tokens" Version="7.5.1" />
|
||||
<PackageReference Include="SixLabors.ImageSharp" Version="3.1.4" />
|
||||
|
||||
@@ -25,10 +25,8 @@ public class Program
|
||||
context.SaveChanges();
|
||||
|
||||
// clean up residuals
|
||||
var looseFiles = context.Files.Where(f => f.Uploaded == false);
|
||||
var unfinishedRegistrations = context.LodeStoneAuth.Where(c => c.StartedAt != null);
|
||||
context.RemoveRange(unfinishedRegistrations);
|
||||
context.RemoveRange(looseFiles);
|
||||
context.SaveChanges();
|
||||
|
||||
logger.LogInformation(options.ToString());
|
||||
|
||||
@@ -0,0 +1,137 @@
|
||||
using MareSynchronosShared;
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosShared.Utils;
|
||||
using MaxMind.GeoIP2;
|
||||
|
||||
namespace MareSynchronosServer.Services;
|
||||
|
||||
public class GeoIPService : IHostedService
|
||||
{
|
||||
private readonly ILogger<GeoIPService> _logger;
|
||||
private readonly IConfigurationService<ServerConfiguration> _mareConfiguration;
|
||||
private bool _useGeoIP = false;
|
||||
private string _cityFile = string.Empty;
|
||||
private DatabaseReader? _dbReader;
|
||||
private DateTime _dbLastWriteTime = DateTime.Now;
|
||||
private CancellationTokenSource _fileWriteTimeCheckCts = new();
|
||||
private bool _processingReload = false;
|
||||
|
||||
public GeoIPService(ILogger<GeoIPService> logger,
|
||||
IConfigurationService<ServerConfiguration> mareConfiguration)
|
||||
{
|
||||
_logger = logger;
|
||||
_mareConfiguration = mareConfiguration;
|
||||
}
|
||||
|
||||
public async Task<string> GetCountryFromIP(IHttpContextAccessor httpContextAccessor)
|
||||
{
|
||||
if (!_useGeoIP)
|
||||
{
|
||||
return "*";
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var ip = httpContextAccessor.GetIpAddress();
|
||||
|
||||
using CancellationTokenSource waitCts = new();
|
||||
waitCts.CancelAfter(TimeSpan.FromSeconds(5));
|
||||
while (_processingReload) await Task.Delay(100, waitCts.Token).ConfigureAwait(false);
|
||||
|
||||
if (_dbReader.TryCity(ip, out var response))
|
||||
{
|
||||
var continent = response.Continent.Code;
|
||||
if (string.Equals(continent, "NA", StringComparison.Ordinal)
|
||||
&& response.Location.Longitude != null)
|
||||
{
|
||||
if (response.Location.Longitude < -102)
|
||||
{
|
||||
continent = "NA-W";
|
||||
}
|
||||
else
|
||||
{
|
||||
continent = "NA-E";
|
||||
}
|
||||
}
|
||||
|
||||
return continent ?? "*";
|
||||
}
|
||||
|
||||
return "*";
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error handling Geo IP country in request");
|
||||
return "*";
|
||||
}
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_logger.LogInformation("GeoIP module starting update task");
|
||||
|
||||
var token = _fileWriteTimeCheckCts.Token;
|
||||
_ = PeriodicReloadTask(token);
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task PeriodicReloadTask(CancellationToken token)
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
_processingReload = true;
|
||||
|
||||
var useGeoIP = _mareConfiguration.GetValueOrDefault(nameof(ServerConfiguration.UseGeoIP), false);
|
||||
var cityFile = _mareConfiguration.GetValueOrDefault(nameof(ServerConfiguration.GeoIPDbCityFile), string.Empty);
|
||||
var lastWriteTime = new FileInfo(cityFile).LastWriteTimeUtc;
|
||||
if (useGeoIP && (!string.Equals(cityFile, _cityFile, StringComparison.OrdinalIgnoreCase) || lastWriteTime != _dbLastWriteTime))
|
||||
{
|
||||
_cityFile = cityFile;
|
||||
if (!File.Exists(_cityFile)) throw new FileNotFoundException($"Could not open GeoIP City Database, path does not exist: {_cityFile}");
|
||||
_dbReader?.Dispose();
|
||||
_dbReader = null;
|
||||
_dbReader = new DatabaseReader(_cityFile);
|
||||
_dbLastWriteTime = lastWriteTime;
|
||||
|
||||
_ = _dbReader.City("8.8.8.8").Continent;
|
||||
|
||||
_logger.LogInformation($"Loaded GeoIP city file from {_cityFile}");
|
||||
|
||||
if (_useGeoIP != useGeoIP)
|
||||
{
|
||||
_logger.LogInformation("GeoIP module is now enabled");
|
||||
_useGeoIP = useGeoIP;
|
||||
}
|
||||
}
|
||||
|
||||
if (_useGeoIP != useGeoIP && !useGeoIP)
|
||||
{
|
||||
_logger.LogInformation("GeoIP module is now disabled");
|
||||
_useGeoIP = useGeoIP;
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogWarning(e, "Error during periodic GeoIP module reload task, disabling GeoIP");
|
||||
_useGeoIP = false;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_processingReload = false;
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_fileWriteTimeCheckCts.Cancel();
|
||||
_fileWriteTimeCheckCts.Dispose();
|
||||
_dbReader.Dispose();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -144,9 +144,6 @@ public class UserCleanupService : IHostedService
|
||||
|
||||
var auth = dbContext.Auth.Single(a => a.UserUID == user.UID);
|
||||
|
||||
var userFiles = dbContext.Files.Where(f => f.Uploaded && f.Uploader.UID == user.UID).ToList();
|
||||
dbContext.Files.RemoveRange(userFiles);
|
||||
|
||||
var ownPairData = dbContext.ClientPairs.Where(u => u.User.UID == user.UID).ToList();
|
||||
dbContext.ClientPairs.RemoveRange(ownPairData);
|
||||
var otherPairData = dbContext.ClientPairs.Include(u => u.User)
|
||||
|
||||
@@ -96,8 +96,10 @@ public class Startup
|
||||
|
||||
if (isMainServer)
|
||||
{
|
||||
services.AddSingleton<GeoIPService>();
|
||||
services.AddSingleton<UserCleanupService>();
|
||||
services.AddHostedService(provider => provider.GetService<UserCleanupService>());
|
||||
services.AddHostedService(provider => provider.GetService<GeoIPService>());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,10 @@ public class MareMetrics
|
||||
foreach (var gauge in gaugesToServe)
|
||||
{
|
||||
logger.LogInformation($"Creating Metric for Counter {gauge}");
|
||||
if (!string.Equals(gauge, MetricsAPI.GaugeConnections, StringComparison.OrdinalIgnoreCase))
|
||||
gauges.Add(gauge, Prometheus.Metrics.CreateGauge(gauge, gauge));
|
||||
else
|
||||
gauges.Add(gauge, Prometheus.Metrics.CreateGauge(gauge, gauge, new[] { "continent" }));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,35 +28,57 @@ public class MareMetrics
|
||||
|
||||
private readonly Dictionary<string, Gauge> gauges = new(StringComparer.Ordinal);
|
||||
|
||||
public void IncGaugeWithLabels(string gaugeName, double value = 1.0, params string[] labels)
|
||||
{
|
||||
if (gauges.TryGetValue(gaugeName, out Gauge gauge))
|
||||
{
|
||||
lock (gauge)
|
||||
gauge.WithLabels(labels).Inc(value);
|
||||
}
|
||||
}
|
||||
|
||||
public void DecGaugeWithLabels(string gaugeName, double value = 1.0, params string[] labels)
|
||||
{
|
||||
if (gauges.TryGetValue(gaugeName, out Gauge gauge))
|
||||
{
|
||||
lock (gauge)
|
||||
gauge.WithLabels(labels).Dec(value);
|
||||
}
|
||||
}
|
||||
|
||||
public void SetGaugeTo(string gaugeName, double value)
|
||||
{
|
||||
if (gauges.ContainsKey(gaugeName))
|
||||
if (gauges.TryGetValue(gaugeName, out Gauge gauge))
|
||||
{
|
||||
gauges[gaugeName].Set(value);
|
||||
lock (gauge)
|
||||
gauge.Set(value);
|
||||
}
|
||||
}
|
||||
|
||||
public void IncGauge(string gaugeName, double value = 1.0)
|
||||
{
|
||||
if (gauges.ContainsKey(gaugeName))
|
||||
if (gauges.TryGetValue(gaugeName, out Gauge gauge))
|
||||
{
|
||||
gauges[gaugeName].Inc(value);
|
||||
lock (gauge)
|
||||
gauge.Inc(value);
|
||||
}
|
||||
}
|
||||
|
||||
public void DecGauge(string gaugeName, double value = 1.0)
|
||||
{
|
||||
if (gauges.ContainsKey(gaugeName))
|
||||
if (gauges.TryGetValue(gaugeName, out Gauge gauge))
|
||||
{
|
||||
gauges[gaugeName].Dec(value);
|
||||
lock (gauge)
|
||||
gauge.Dec(value);
|
||||
}
|
||||
}
|
||||
|
||||
public void IncCounter(string counterName, double value = 1.0)
|
||||
{
|
||||
if (counters.ContainsKey(counterName))
|
||||
if (counters.TryGetValue(counterName, out Counter counter))
|
||||
{
|
||||
counters[counterName].Inc(value);
|
||||
lock (counter)
|
||||
counter.Inc(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@
|
||||
public class MetricsAPI
|
||||
{
|
||||
public const string CounterInitializedConnections = "mare_initialized_connections";
|
||||
public const string GaugeConnections = "mare_unauthorized_connections";
|
||||
public const string GaugeConnections = "mare_connections";
|
||||
public const string GaugeAuthorizedConnections = "mare_authorized_connections";
|
||||
public const string GaugeAvailableWorkerThreads = "mare_available_threadpool";
|
||||
public const string GaugeAvailableIOWorkerThreads = "mare_available_threadpool_io";
|
||||
@@ -12,7 +12,11 @@ public class MetricsAPI
|
||||
public const string GaugePairs = "mare_pairs";
|
||||
public const string GaugePairsPaused = "mare_pairs_paused";
|
||||
public const string GaugeFilesTotal = "mare_files";
|
||||
public const string GaugeFilesTotalColdStorage = "mare_files_cold";
|
||||
public const string GaugeFilesTotalSize = "mare_files_size";
|
||||
public const string GaugeFilesTotalSizeColdStorage = "mare_files_size_cold";
|
||||
public const string GaugeFilesDownloadingFromCache = "mare_files_downloading_from_cache";
|
||||
public const string GaugeFilesTasksWaitingForDownloadFromCache = "mare_files_waiting_for_dl";
|
||||
public const string CounterUserPushData = "mare_user_push";
|
||||
public const string CounterUserPushDataTo = "mare_user_push_to";
|
||||
public const string CounterAuthenticationRequests = "mare_auth_requests";
|
||||
@@ -31,6 +35,9 @@ public class MetricsAPI
|
||||
public const string GaugeQueueActive = "mare_download_queue_active";
|
||||
public const string GaugeQueueInactive = "mare_download_queue_inactive";
|
||||
public const string GaugeDownloadQueue = "mare_download_queue";
|
||||
public const string GaugeDownloadQueueCancelled = "mare_download_queue_cancelled";
|
||||
public const string GaugeDownloadPriorityQueue = "mare_download_priority_queue";
|
||||
public const string GaugeDownloadPriorityQueueCancelled = "mare_download_priority_queue_cancelled";
|
||||
public const string CounterFileRequests = "mare_files_requests";
|
||||
public const string CounterFileRequestSize = "mare_files_request_size";
|
||||
public const string CounterAccountsCreated = "mare_accounts_created";
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using MareSynchronosShared.Utils;
|
||||
using Microsoft.Extensions.Options;
|
||||
using System.Collections;
|
||||
using System.Text;
|
||||
|
||||
namespace MareSynchronosShared.Services;
|
||||
@@ -30,11 +31,20 @@ public class MareConfigurationServiceServer<T> : IConfigurationService<T> where
|
||||
StringBuilder sb = new();
|
||||
foreach (var prop in props)
|
||||
{
|
||||
sb.AppendLine($"{prop.Name} (IsRemote: {prop.GetCustomAttributes(typeof(RemoteConfigurationAttribute), true).Any()}) => {prop.GetValue(_config.CurrentValue)}");
|
||||
var isRemote = prop.GetCustomAttributes(typeof(RemoteConfigurationAttribute), true).Any();
|
||||
var getValueMethod = GetType().GetMethod(nameof(GetValue)).MakeGenericMethod(prop.PropertyType);
|
||||
var value = isRemote ? getValueMethod.Invoke(this, new[] { prop.Name }) : prop.GetValue(_config.CurrentValue);
|
||||
if (typeof(IEnumerable).IsAssignableFrom(prop.PropertyType) && !typeof(string).IsAssignableFrom(prop.PropertyType))
|
||||
{
|
||||
var enumVal = (IEnumerable)value;
|
||||
value = string.Empty;
|
||||
foreach (var listVal in enumVal)
|
||||
{
|
||||
value += listVal.ToString() + ", ";
|
||||
}
|
||||
}
|
||||
sb.AppendLine($"{prop.Name} (IsRemote: {isRemote}) => {value}");
|
||||
}
|
||||
|
||||
sb.AppendLine(_config.ToString());
|
||||
|
||||
return sb.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,12 @@
|
||||
|
||||
public class CdnShardConfiguration
|
||||
{
|
||||
public List<string> Continents { get; set; }
|
||||
public string FileMatch { get; set; }
|
||||
public Uri CdnFullUrl { get; set; }
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
return CdnFullUrl.ToString() + " == " + FileMatch;
|
||||
return CdnFullUrl.ToString() + "[" + string.Join(',', Continents) + "] == " + FileMatch;
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,8 @@
|
||||
public static class MareClaimTypes
|
||||
{
|
||||
public const string Uid = "uid";
|
||||
public const string Alias = "alias";
|
||||
public const string CharaIdent = "character_identification";
|
||||
public const string Internal = "internal";
|
||||
public const string Continent = "continent";
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@ public class MareConfigurationAuthBase : MareConfigurationBase
|
||||
public int RegisterIpDurationInMinutes { get; set; } = 10;
|
||||
[RemoteConfiguration]
|
||||
public List<string> WhitelistedIps { get; set; } = new();
|
||||
[RemoteConfiguration]
|
||||
public bool UseGeoIP { get; set; } = false;
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
@@ -25,6 +27,7 @@ public class MareConfigurationAuthBase : MareConfigurationBase
|
||||
sb.AppendLine($"{nameof(RegisterIpDurationInMinutes)} => {RegisterIpDurationInMinutes}");
|
||||
sb.AppendLine($"{nameof(Jwt)} => {Jwt}");
|
||||
sb.AppendLine($"{nameof(WhitelistedIps)} => {string.Join(", ", WhitelistedIps)}");
|
||||
sb.AppendLine($"{nameof(UseGeoIP)} => {UseGeoIP}");
|
||||
return sb.ToString();
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,7 @@ public class ServerConfiguration : MareConfigurationAuthBase
|
||||
|
||||
[RemoteConfiguration]
|
||||
public int PurgeUnusedAccountsPeriodInDays { get; set; } = 14;
|
||||
public string GeoIPDbCityFile { get; set; } = string.Empty;
|
||||
|
||||
public int RedisPool { get; set; } = 50;
|
||||
|
||||
@@ -40,6 +41,7 @@ public class ServerConfiguration : MareConfigurationAuthBase
|
||||
sb.AppendLine($"{nameof(MaxGroupUserCount)} => {MaxGroupUserCount}");
|
||||
sb.AppendLine($"{nameof(PurgeUnusedAccounts)} => {PurgeUnusedAccounts}");
|
||||
sb.AppendLine($"{nameof(PurgeUnusedAccountsPeriodInDays)} => {PurgeUnusedAccountsPeriodInDays}");
|
||||
sb.AppendLine($"{nameof(GeoIPDbCityFile)} => {GeoIPDbCityFile}");
|
||||
return sb.ToString();
|
||||
}
|
||||
}
|
||||
@@ -65,9 +65,6 @@ public static class SharedDbFunctions
|
||||
|
||||
var auth = dbContext.Auth.Single(a => a.UserUID == user.UID);
|
||||
|
||||
var userFiles = dbContext.Files.Where(f => f.Uploaded && f.Uploader.UID == user.UID).ToList();
|
||||
dbContext.Files.RemoveRange(userFiles);
|
||||
|
||||
var ownPairData = dbContext.ClientPairs.Where(u => u.User.UID == user.UID).ToList();
|
||||
dbContext.ClientPairs.RemoveRange(ownPairData);
|
||||
var otherPairData = dbContext.ClientPairs.Include(u => u.User)
|
||||
|
||||
@@ -5,15 +5,28 @@ namespace MareSynchronosStaticFilesServer;
|
||||
|
||||
public class StaticFilesServerConfiguration : MareConfigurationBase
|
||||
{
|
||||
public bool IsDistributionNode { get; set; } = false;
|
||||
public bool NotifyMainServerDirectly { get; set; } = false;
|
||||
public Uri? MainFileServerAddress { get; set; } = null;
|
||||
public Uri? DistributionFileServerAddress { get; set; } = null;
|
||||
public bool DistributionFileServerForceHTTP2 { get; set; } = false;
|
||||
public int ForcedDeletionOfFilesAfterHours { get; set; } = -1;
|
||||
public double CacheSizeHardLimitInGiB { get; set; } = -1;
|
||||
public int MinimumFileRetentionPeriodInDays { get; set; } = 7;
|
||||
public int UnusedFileRetentionPeriodInDays { get; set; } = 14;
|
||||
public string CacheDirectory { get; set; }
|
||||
public int DownloadQueueSize { get; set; } = 50;
|
||||
public int DownloadTimeoutSeconds { get; set; } = 5;
|
||||
public int DownloadQueueReleaseSeconds { get; set; } = 15;
|
||||
public int DownloadQueueClearLimit { get; set; } = 15000;
|
||||
public int CleanupCheckInMinutes { get; set; } = 15;
|
||||
public bool UseColdStorage { get; set; } = false;
|
||||
public string? ColdStorageDirectory { get; set; } = null;
|
||||
public double ColdStorageSizeHardLimitInGiB { get; set; } = -1;
|
||||
public int ColdStorageMinimumFileRetentionPeriodInDays { get; set; } = 30;
|
||||
public int ColdStorageUnusedFileRetentionPeriodInDays { get; set; } = 30;
|
||||
public double CacheSmallSizeThresholdKiB { get; set; } = 64;
|
||||
public double CacheLargeSizeThresholdKiB { get; set; } = 1024;
|
||||
[RemoteConfiguration]
|
||||
public Uri CdnFullUrl { get; set; } = null;
|
||||
[RemoteConfiguration]
|
||||
@@ -22,13 +35,26 @@ public class StaticFilesServerConfiguration : MareConfigurationBase
|
||||
{
|
||||
StringBuilder sb = new();
|
||||
sb.AppendLine(base.ToString());
|
||||
sb.AppendLine($"{nameof(IsDistributionNode)} => {IsDistributionNode}");
|
||||
sb.AppendLine($"{nameof(NotifyMainServerDirectly)} => {NotifyMainServerDirectly}");
|
||||
sb.AppendLine($"{nameof(MainFileServerAddress)} => {MainFileServerAddress}");
|
||||
sb.AppendLine($"{nameof(DistributionFileServerAddress)} => {DistributionFileServerAddress}");
|
||||
sb.AppendLine($"{nameof(DistributionFileServerForceHTTP2)} => {DistributionFileServerForceHTTP2}");
|
||||
sb.AppendLine($"{nameof(ForcedDeletionOfFilesAfterHours)} => {ForcedDeletionOfFilesAfterHours}");
|
||||
sb.AppendLine($"{nameof(CacheSizeHardLimitInGiB)} => {CacheSizeHardLimitInGiB}");
|
||||
sb.AppendLine($"{nameof(UseColdStorage)} => {UseColdStorage}");
|
||||
sb.AppendLine($"{nameof(ColdStorageDirectory)} => {ColdStorageDirectory}");
|
||||
sb.AppendLine($"{nameof(ColdStorageSizeHardLimitInGiB)} => {ColdStorageSizeHardLimitInGiB}");
|
||||
sb.AppendLine($"{nameof(ColdStorageMinimumFileRetentionPeriodInDays)} => {ColdStorageMinimumFileRetentionPeriodInDays}");
|
||||
sb.AppendLine($"{nameof(ColdStorageUnusedFileRetentionPeriodInDays)} => {ColdStorageUnusedFileRetentionPeriodInDays}");
|
||||
sb.AppendLine($"{nameof(MinimumFileRetentionPeriodInDays)} => {MinimumFileRetentionPeriodInDays}");
|
||||
sb.AppendLine($"{nameof(UnusedFileRetentionPeriodInDays)} => {UnusedFileRetentionPeriodInDays}");
|
||||
sb.AppendLine($"{nameof(CacheSmallSizeThresholdKiB)} => {CacheSmallSizeThresholdKiB}");
|
||||
sb.AppendLine($"{nameof(CacheLargeSizeThresholdKiB)} => {CacheLargeSizeThresholdKiB}");
|
||||
sb.AppendLine($"{nameof(CacheDirectory)} => {CacheDirectory}");
|
||||
sb.AppendLine($"{nameof(DownloadQueueSize)} => {DownloadQueueSize}");
|
||||
sb.AppendLine($"{nameof(DownloadQueueReleaseSeconds)} => {DownloadQueueReleaseSeconds}");
|
||||
sb.AppendLine($"{nameof(CdnShardConfiguration)} => {string.Join(", ", CdnShardConfiguration)}");
|
||||
return sb.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,4 +13,6 @@ public class ControllerBase : Controller
|
||||
}
|
||||
|
||||
protected string MareUser => HttpContext.User.Claims.First(f => string.Equals(f.Type, MareClaimTypes.Uid, StringComparison.Ordinal)).Value;
|
||||
protected string Continent => HttpContext.User.Claims.FirstOrDefault(f => string.Equals(f.Type, MareClaimTypes.Continent, StringComparison.Ordinal))?.Value ?? "*";
|
||||
protected bool IsPriority => false;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronosStaticFilesServer.Services;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Controllers;
|
||||
|
||||
[Route(MareFiles.Distribution)]
|
||||
public class DistributionController : ControllerBase
|
||||
{
|
||||
private readonly CachedFileProvider _cachedFileProvider;
|
||||
|
||||
public DistributionController(ILogger<DistributionController> logger, CachedFileProvider cachedFileProvider) : base(logger)
|
||||
{
|
||||
_cachedFileProvider = cachedFileProvider;
|
||||
}
|
||||
|
||||
[HttpGet(MareFiles.Distribution_Get)]
|
||||
[Authorize(Policy = "Internal")]
|
||||
public async Task<IActionResult> GetFile(string file)
|
||||
{
|
||||
_logger.LogInformation($"GetFile:{MareUser}:{file}");
|
||||
|
||||
var fs = await _cachedFileProvider.GetAndDownloadFileStream(file);
|
||||
if (fs == null) return NotFound();
|
||||
|
||||
return File(fs, "application/octet-stream");
|
||||
}
|
||||
|
||||
[HttpPost("touch")]
|
||||
[Authorize(Policy = "Internal")]
|
||||
public IActionResult TouchFiles([FromBody] string[] files)
|
||||
{
|
||||
_logger.LogInformation($"TouchFiles:{MareUser}:{files.Length}");
|
||||
|
||||
if (files.Length == 0)
|
||||
return Ok();
|
||||
|
||||
Task.Run(() => {
|
||||
foreach (var file in files)
|
||||
_cachedFileProvider.TouchColdHash(file);
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
return Ok();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronosStaticFilesServer.Services;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Controllers;
|
||||
|
||||
[Route(MareFiles.Main)]
|
||||
public class MainController : ControllerBase
|
||||
{
|
||||
private readonly IClientReadyMessageService _messageService;
|
||||
|
||||
public MainController(ILogger<MainController> logger, IClientReadyMessageService mareHub) : base(logger)
|
||||
{
|
||||
_messageService = mareHub;
|
||||
}
|
||||
|
||||
[HttpGet(MareFiles.Main_SendReady)]
|
||||
[Authorize(Policy = "Internal")]
|
||||
public IActionResult SendReadyToClients(string uid, Guid requestId)
|
||||
{
|
||||
_messageService.SendDownloadReady(uid, requestId);
|
||||
return Ok();
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,4 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosStaticFilesServer.Services;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
|
||||
@@ -10,14 +9,11 @@ public class RequestController : ControllerBase
|
||||
{
|
||||
private readonly CachedFileProvider _cachedFileProvider;
|
||||
private readonly RequestQueueService _requestQueue;
|
||||
private readonly MareDbContext _mareDbContext;
|
||||
private static readonly SemaphoreSlim _parallelRequestSemaphore = new(500);
|
||||
|
||||
public RequestController(ILogger<RequestController> logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue, MareDbContext mareDbContext) : base(logger)
|
||||
public RequestController(ILogger<RequestController> logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue) : base(logger)
|
||||
{
|
||||
_cachedFileProvider = cachedFileProvider;
|
||||
_requestQueue = requestQueue;
|
||||
_mareDbContext = mareDbContext;
|
||||
}
|
||||
|
||||
[HttpGet]
|
||||
@@ -26,15 +22,10 @@ public class RequestController : ControllerBase
|
||||
{
|
||||
try
|
||||
{
|
||||
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
|
||||
_requestQueue.RemoveFromQueue(requestId, MareUser);
|
||||
_requestQueue.RemoveFromQueue(requestId, MareUser, IsPriority);
|
||||
return Ok();
|
||||
}
|
||||
catch (OperationCanceledException) { return BadRequest(); }
|
||||
finally
|
||||
{
|
||||
_parallelRequestSemaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
[HttpPost]
|
||||
@@ -43,23 +34,18 @@ public class RequestController : ControllerBase
|
||||
{
|
||||
try
|
||||
{
|
||||
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
|
||||
foreach (var file in files)
|
||||
{
|
||||
_logger.LogDebug("Prerequested file: " + file);
|
||||
_cachedFileProvider.DownloadFileWhenRequired(file);
|
||||
await _cachedFileProvider.DownloadFileWhenRequired(file).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
Guid g = Guid.NewGuid();
|
||||
await _requestQueue.EnqueueUser(new(g, MareUser, files.ToList()), _mareDbContext);
|
||||
await _requestQueue.EnqueueUser(new(g, MareUser, files.ToList()), IsPriority, HttpContext.RequestAborted);
|
||||
|
||||
return Ok(g);
|
||||
}
|
||||
catch (OperationCanceledException) { return BadRequest(); }
|
||||
finally
|
||||
{
|
||||
_parallelRequestSemaphore.Release();
|
||||
}
|
||||
}
|
||||
|
||||
[HttpGet]
|
||||
@@ -68,8 +54,8 @@ public class RequestController : ControllerBase
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!await _requestQueue.StillEnqueued(requestId, MareUser, _mareDbContext))
|
||||
await _requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList()), _mareDbContext);
|
||||
if (!_requestQueue.StillEnqueued(requestId, MareUser, IsPriority))
|
||||
await _requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList()), IsPriority, HttpContext.RequestAborted);
|
||||
return Ok();
|
||||
}
|
||||
catch (OperationCanceledException) { return BadRequest(); }
|
||||
|
||||
@@ -10,10 +10,10 @@ using MareSynchronosShared.Services;
|
||||
using MareSynchronosShared.Utils;
|
||||
using MareSynchronosStaticFilesServer.Services;
|
||||
using MareSynchronosStaticFilesServer.Utils;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Security.Cryptography;
|
||||
using System.Security.Policy;
|
||||
@@ -28,6 +28,7 @@ public class ServerFilesController : ControllerBase
|
||||
private static readonly SemaphoreSlim _fileLockDictLock = new(1);
|
||||
private static readonly ConcurrentDictionary<string, SemaphoreSlim> _fileUploadLocks = new(StringComparer.Ordinal);
|
||||
private readonly string _basePath;
|
||||
private readonly string _coldBasePath;
|
||||
private readonly CachedFileProvider _cachedFileProvider;
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
private readonly IHubContext<MareHub> _hubContext;
|
||||
@@ -36,12 +37,14 @@ public class ServerFilesController : ControllerBase
|
||||
|
||||
public ServerFilesController(ILogger<ServerFilesController> logger, CachedFileProvider cachedFileProvider,
|
||||
IConfigurationService<StaticFilesServerConfiguration> configuration,
|
||||
IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext,
|
||||
IHubContext<MareHub> hubContext,
|
||||
MareDbContext mareDbContext, MareMetrics metricsClient) : base(logger)
|
||||
{
|
||||
_basePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
_cachedFileProvider = cachedFileProvider;
|
||||
_configuration = configuration;
|
||||
_basePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
if (_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false))
|
||||
_basePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.ColdStorageDirectory));
|
||||
_cachedFileProvider = cachedFileProvider;
|
||||
_hubContext = hubContext;
|
||||
_mareDbContext = mareDbContext;
|
||||
_metricsClient = metricsClient;
|
||||
@@ -50,44 +53,47 @@ public class ServerFilesController : ControllerBase
|
||||
[HttpPost(MareFiles.ServerFiles_DeleteAll)]
|
||||
public async Task<IActionResult> FilesDeleteAll()
|
||||
{
|
||||
var ownFiles = await _mareDbContext.Files.Where(f => f.Uploaded && f.Uploader.UID == MareUser).ToListAsync().ConfigureAwait(false);
|
||||
|
||||
foreach (var dbFile in ownFiles)
|
||||
{
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_basePath, dbFile.Hash);
|
||||
if (fi != null)
|
||||
{
|
||||
_metricsClient.DecGauge(MetricsAPI.GaugeFilesTotal, fi == null ? 0 : 1);
|
||||
_metricsClient.DecGauge(MetricsAPI.GaugeFilesTotalSize, fi?.Length ?? 0);
|
||||
|
||||
fi?.Delete();
|
||||
}
|
||||
}
|
||||
|
||||
_mareDbContext.Files.RemoveRange(ownFiles);
|
||||
await _mareDbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
|
||||
return Ok();
|
||||
}
|
||||
|
||||
[HttpGet(MareFiles.ServerFiles_GetSizes)]
|
||||
public async Task<IActionResult> FilesGetSizes([FromBody] List<string> hashes)
|
||||
{
|
||||
var allFiles = await _mareDbContext.Files.Where(f => hashes.Contains(f.Hash)).ToListAsync().ConfigureAwait(false);
|
||||
var forbiddenFiles = await _mareDbContext.ForbiddenUploadEntries.
|
||||
Where(f => hashes.Contains(f.Hash)).ToListAsync().ConfigureAwait(false);
|
||||
List<DownloadFileDto> response = new();
|
||||
|
||||
var cacheFile = await _mareDbContext.Files.AsNoTracking().Where(f => hashes.Contains(f.Hash)).AsNoTracking().Select(k => new { k.Hash, k.Size }).AsNoTracking().ToListAsync().ConfigureAwait(false);
|
||||
|
||||
var shardConfig = new List<CdnShardConfiguration>(_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CdnShardConfiguration), new List<CdnShardConfiguration>()));
|
||||
var allFileShards = new List<CdnShardConfiguration>(_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CdnShardConfiguration), new List<CdnShardConfiguration>()));
|
||||
|
||||
foreach (var file in cacheFile)
|
||||
{
|
||||
var forbiddenFile = forbiddenFiles.SingleOrDefault(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase));
|
||||
Uri? baseUrl = null;
|
||||
|
||||
var matchedShardConfig = shardConfig.OrderBy(g => Guid.NewGuid()).FirstOrDefault(f => new Regex(f.FileMatch).IsMatch(file.Hash));
|
||||
var baseUrl = matchedShardConfig?.CdnFullUrl ?? _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
|
||||
if (forbiddenFile == null)
|
||||
{
|
||||
List<CdnShardConfiguration> selectedShards = new();
|
||||
var matchingShards = allFileShards.Where(f => new Regex(f.FileMatch).IsMatch(file.Hash)).ToList();
|
||||
|
||||
if (string.Equals(Continent, "*", StringComparison.Ordinal))
|
||||
{
|
||||
selectedShards = matchingShards;
|
||||
}
|
||||
else
|
||||
{
|
||||
selectedShards = matchingShards.Where(c => c.Continents.Contains(Continent, StringComparer.OrdinalIgnoreCase)).ToList();
|
||||
if (!selectedShards.Any()) selectedShards = matchingShards;
|
||||
}
|
||||
|
||||
var shard = selectedShards
|
||||
.OrderBy(s => !s.Continents.Any() ? 0 : 1)
|
||||
.ThenBy(s => s.Continents.Contains("*", StringComparer.Ordinal) ? 0 : 1)
|
||||
.ThenBy(g => Guid.NewGuid()).FirstOrDefault();
|
||||
|
||||
baseUrl = shard?.CdnFullUrl ?? _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
|
||||
}
|
||||
|
||||
response.Add(new DownloadFileDto
|
||||
{
|
||||
@@ -96,7 +102,7 @@ public class ServerFilesController : ControllerBase
|
||||
IsForbidden = forbiddenFile != null,
|
||||
Hash = file.Hash,
|
||||
Size = file.Size,
|
||||
Url = baseUrl.ToString(),
|
||||
Url = baseUrl?.ToString() ?? string.Empty,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -145,18 +151,6 @@ public class ServerFilesController : ControllerBase
|
||||
return Ok(JsonSerializer.Serialize(notCoveredFiles.Values.ToList()));
|
||||
}
|
||||
|
||||
[HttpGet(MareFiles.ServerFiles_Get + "/{fileId}")]
|
||||
[Authorize(Policy = "Internal")]
|
||||
public IActionResult GetFile(string fileId)
|
||||
{
|
||||
_logger.LogInformation($"GetFile:{MareUser}:{fileId}");
|
||||
|
||||
var fs = _cachedFileProvider.GetLocalFileStream(fileId);
|
||||
if (fs == null) return NotFound();
|
||||
|
||||
return File(fs, "application/octet-stream");
|
||||
}
|
||||
|
||||
[HttpPost(MareFiles.ServerFiles_Upload + "/{hash}")]
|
||||
[RequestSizeLimit(200 * 1024 * 1024)]
|
||||
public async Task<IActionResult> UploadFile(string hash, CancellationToken requestAborted)
|
||||
|
||||
@@ -8,83 +8,182 @@ using MareSynchronos.API.Routes;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public class CachedFileProvider
|
||||
public sealed class CachedFileProvider : IDisposable
|
||||
{
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
private readonly ILogger<CachedFileProvider> _logger;
|
||||
private readonly FileStatisticsService _fileStatisticsService;
|
||||
private readonly MareMetrics _metrics;
|
||||
private readonly ServerTokenGenerator _generator;
|
||||
private readonly ITouchHashService _touchService;
|
||||
private readonly Uri _remoteCacheSourceUri;
|
||||
private readonly string _basePath;
|
||||
private readonly bool _useColdStorage;
|
||||
private readonly string _hotStoragePath;
|
||||
private readonly string _coldStoragePath;
|
||||
private readonly ConcurrentDictionary<string, Task> _currentTransfers = new(StringComparer.Ordinal);
|
||||
private readonly HttpClient _httpClient;
|
||||
private bool IsMainServer => _remoteCacheSourceUri == null;
|
||||
private readonly SemaphoreSlim _downloadSemaphore = new(1, 1);
|
||||
private bool _disposed;
|
||||
|
||||
public CachedFileProvider(IConfigurationService<StaticFilesServerConfiguration> configuration, ILogger<CachedFileProvider> logger, FileStatisticsService fileStatisticsService, MareMetrics metrics, ServerTokenGenerator generator)
|
||||
private bool IsMainServer => _remoteCacheSourceUri == null && _isDistributionServer;
|
||||
private bool _isDistributionServer;
|
||||
|
||||
public CachedFileProvider(IConfigurationService<StaticFilesServerConfiguration> configuration, ILogger<CachedFileProvider> logger,
|
||||
FileStatisticsService fileStatisticsService, MareMetrics metrics, ServerTokenGenerator generator, ITouchHashService touchService)
|
||||
{
|
||||
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
|
||||
_configuration = configuration;
|
||||
_logger = logger;
|
||||
_fileStatisticsService = fileStatisticsService;
|
||||
_metrics = metrics;
|
||||
_generator = generator;
|
||||
_remoteCacheSourceUri = configuration.GetValueOrDefault<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress), null);
|
||||
_basePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
_httpClient = new HttpClient();
|
||||
_touchService = touchService;
|
||||
_remoteCacheSourceUri = configuration.GetValueOrDefault<Uri>(nameof(StaticFilesServerConfiguration.DistributionFileServerAddress), null);
|
||||
_isDistributionServer = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.IsDistributionNode), false);
|
||||
_useColdStorage = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
||||
_hotStoragePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
_coldStoragePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.ColdStorageDirectory));
|
||||
_httpClient = new();
|
||||
_httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("MareSynchronosServer", "1.0.0.0"));
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
_httpClient?.Dispose();
|
||||
}
|
||||
|
||||
private async Task DownloadTask(string hash)
|
||||
{
|
||||
// download file from remote
|
||||
var downloadUrl = MareFiles.ServerFilesGetFullPath(_remoteCacheSourceUri, hash);
|
||||
var destinationFilePath = FilePathUtil.GetFilePath(_useColdStorage ? _coldStoragePath : _hotStoragePath, hash);
|
||||
|
||||
// if cold storage is not configured or file not found or error is present try to download file from remote
|
||||
var downloadUrl = MareFiles.DistributionGetFullPath(_remoteCacheSourceUri, hash);
|
||||
_logger.LogInformation("Did not find {hash}, downloading from {server}", hash, downloadUrl);
|
||||
|
||||
using var requestMessage = new HttpRequestMessage(HttpMethod.Get, downloadUrl);
|
||||
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _generator.Token);
|
||||
var response = await _httpClient.SendAsync(requestMessage).ConfigureAwait(false);
|
||||
if (_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DistributionFileServerForceHTTP2), false))
|
||||
{
|
||||
requestMessage.Version = new Version(2, 0);
|
||||
requestMessage.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
|
||||
}
|
||||
HttpResponseMessage? response = null;
|
||||
|
||||
try
|
||||
{
|
||||
response = await _httpClient.SendAsync(requestMessage).ConfigureAwait(false);
|
||||
response.EnsureSuccessStatusCode();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to download {url}", downloadUrl);
|
||||
response?.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
var fileName = FilePathUtil.GetFilePath(_basePath, hash);
|
||||
using var fileStream = new FileStream(fileName, FileMode.OpenOrCreate, FileAccess.ReadWrite);
|
||||
var bufferSize = response.Content.Headers.ContentLength > 1024 * 1024 ? 4096 : 1024;
|
||||
var tempFileName = destinationFilePath + ".dl";
|
||||
var fileStream = new FileStream(tempFileName, FileMode.Create, FileAccess.ReadWrite);
|
||||
var bufferSize = 4096;
|
||||
var buffer = new byte[bufferSize];
|
||||
|
||||
var bytesRead = 0;
|
||||
while ((bytesRead = await (await response.Content.ReadAsStreamAsync().ConfigureAwait(false)).ReadAsync(buffer).ConfigureAwait(false)) > 0)
|
||||
using var content = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
|
||||
while ((bytesRead = await content.ReadAsync(buffer).ConfigureAwait(false)) > 0)
|
||||
{
|
||||
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead)).ConfigureAwait(false);
|
||||
}
|
||||
await fileStream.FlushAsync().ConfigureAwait(false);
|
||||
await fileStream.DisposeAsync().ConfigureAwait(false);
|
||||
File.Move(tempFileName, destinationFilePath, true);
|
||||
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, FilePathUtil.GetFileInfoForHash(_basePath, hash).Length);
|
||||
_metrics.IncGauge(_useColdStorage ? MetricsAPI.GaugeFilesTotalColdStorage : MetricsAPI.GaugeFilesTotal);
|
||||
_metrics.IncGauge(_useColdStorage ? MetricsAPI.GaugeFilesTotalSizeColdStorage : MetricsAPI.GaugeFilesTotalSize, new FileInfo(destinationFilePath).Length);
|
||||
response.Dispose();
|
||||
}
|
||||
|
||||
public void DownloadFileWhenRequired(string hash)
|
||||
private bool TryCopyFromColdStorage(string hash, string destinationFilePath)
|
||||
{
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_basePath, hash);
|
||||
if (fi == null && IsMainServer) return;
|
||||
if (!_useColdStorage) return false;
|
||||
|
||||
if (fi == null && !_currentTransfers.ContainsKey(hash))
|
||||
if (string.IsNullOrEmpty(_coldStoragePath)) return false;
|
||||
|
||||
var coldStorageFilePath = FilePathUtil.GetFilePath(_coldStoragePath, hash);
|
||||
if (coldStorageFilePath == null) return false;
|
||||
|
||||
try
|
||||
{
|
||||
_logger.LogDebug("Copying {hash} from cold storage: {path}", hash, coldStorageFilePath);
|
||||
var tempFileName = destinationFilePath + ".dl";
|
||||
File.Copy(coldStorageFilePath, tempFileName, true);
|
||||
File.Move(tempFileName, destinationFilePath, true);
|
||||
var destinationFile = new FileInfo(destinationFilePath);
|
||||
destinationFile.LastAccessTimeUtc = DateTime.UtcNow;
|
||||
destinationFile.CreationTimeUtc = DateTime.UtcNow;
|
||||
destinationFile.LastWriteTimeUtc = DateTime.UtcNow;
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, new FileInfo(destinationFilePath).Length);
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Could not copy {coldStoragePath} from cold storage", coldStorageFilePath);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public async Task DownloadFileWhenRequired(string hash)
|
||||
{
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
|
||||
|
||||
if (fi != null && fi.Length != 0)
|
||||
return;
|
||||
|
||||
// first check cold storage
|
||||
if (TryCopyFromColdStorage(hash, FilePathUtil.GetFilePath(_hotStoragePath, hash)))
|
||||
return;
|
||||
|
||||
// no distribution server configured to download from
|
||||
if (_remoteCacheSourceUri == null)
|
||||
return;
|
||||
|
||||
await _downloadSemaphore.WaitAsync().ConfigureAwait(false);
|
||||
if (!_currentTransfers.TryGetValue(hash, out var downloadTask) || (downloadTask?.IsCompleted ?? true))
|
||||
{
|
||||
_currentTransfers[hash] = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesDownloadingFromCache);
|
||||
await DownloadTask(hash).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error during Download Task for {hash}", hash);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesDownloadingFromCache);
|
||||
_currentTransfers.Remove(hash, out _);
|
||||
}
|
||||
});
|
||||
}
|
||||
_downloadSemaphore.Release();
|
||||
}
|
||||
|
||||
public FileStream? GetLocalFileStream(string hash)
|
||||
{
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_basePath, hash);
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
|
||||
if (fi == null) return null;
|
||||
fi.LastAccessTimeUtc = DateTime.UtcNow;
|
||||
|
||||
_touchService.TouchColdHash(hash);
|
||||
|
||||
_fileStatisticsService.LogFile(hash, fi.Length);
|
||||
|
||||
@@ -93,13 +192,38 @@ public class CachedFileProvider
|
||||
|
||||
public async Task<FileStream?> GetAndDownloadFileStream(string hash)
|
||||
{
|
||||
DownloadFileWhenRequired(hash);
|
||||
await DownloadFileWhenRequired(hash).ConfigureAwait(false);
|
||||
|
||||
if (_currentTransfers.TryGetValue(hash, out var downloadTask))
|
||||
{
|
||||
await downloadTask.ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
using CancellationTokenSource cts = new();
|
||||
cts.CancelAfter(TimeSpan.FromSeconds(120));
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTasksWaitingForDownloadFromCache);
|
||||
await downloadTask.WaitAsync(cts.Token).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogWarning(e, "Failed while waiting for download task for {hash}", hash);
|
||||
return null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTasksWaitingForDownloadFromCache);
|
||||
}
|
||||
}
|
||||
|
||||
return GetLocalFileStream(hash);
|
||||
}
|
||||
|
||||
public void TouchColdHash(string hash)
|
||||
{
|
||||
_touchService.TouchColdHash(hash);
|
||||
}
|
||||
|
||||
public bool AnyFilesDownloading(List<string> hashes)
|
||||
{
|
||||
return hashes.Exists(_currentTransfers.Keys.Contains);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosStaticFilesServer.Utils;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
// Perform access time updates for cold cache files accessed via hot cache or shard servers
|
||||
public class ColdTouchHashService : ITouchHashService
|
||||
{
|
||||
private readonly ILogger<ColdTouchHashService> _logger;
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
|
||||
private readonly bool _useColdStorage;
|
||||
private readonly string _coldStoragePath;
|
||||
|
||||
// Debounce multiple updates towards the same file
|
||||
private readonly Dictionary<string, DateTime> _lastUpdateTimesUtc = new(1009, StringComparer.Ordinal);
|
||||
private int _cleanupCounter = 0;
|
||||
private const double _debounceTimeSecs = 90.0;
|
||||
|
||||
public ColdTouchHashService(ILogger<ColdTouchHashService> logger, IConfigurationService<StaticFilesServerConfiguration> configuration)
|
||||
{
|
||||
_logger = logger;
|
||||
_configuration = configuration;
|
||||
_useColdStorage = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
||||
_coldStoragePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.ColdStorageDirectory));
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void TouchColdHash(string hash)
|
||||
{
|
||||
if (!_useColdStorage)
|
||||
return;
|
||||
|
||||
var nowUtc = DateTime.UtcNow;
|
||||
|
||||
// Clean up debounce dictionary regularly
|
||||
if (_cleanupCounter++ >= 1000)
|
||||
{
|
||||
foreach (var entry in _lastUpdateTimesUtc.Where(entry => (nowUtc - entry.Value).TotalSeconds >= _debounceTimeSecs).ToList())
|
||||
_lastUpdateTimesUtc.Remove(entry.Key);
|
||||
_cleanupCounter = 0;
|
||||
}
|
||||
|
||||
// Ignore multiple updates within a 90 second window of the first
|
||||
if (_lastUpdateTimesUtc.TryGetValue(hash, out var lastUpdateTimeUtc) && (nowUtc - lastUpdateTimeUtc).TotalSeconds < _debounceTimeSecs)
|
||||
{
|
||||
_logger.LogDebug($"Debounced touch for {hash}");
|
||||
return;
|
||||
}
|
||||
|
||||
var fileInfo = FilePathUtil.GetFileInfoForHash(_coldStoragePath, hash);
|
||||
if (fileInfo != null)
|
||||
{
|
||||
_logger.LogDebug($"Touching {fileInfo.Name}");
|
||||
fileInfo.LastAccessTimeUtc = nowUtc;
|
||||
_lastUpdateTimesUtc.TryAdd(hash, nowUtc);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,79 +1,77 @@
|
||||
using ByteSizeLib;
|
||||
using MareSynchronos.API.Dto.Files;
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Models;
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosStaticFilesServer.Utils;
|
||||
using MessagePack.Formatters;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Hosting.Systemd;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public class FileCleanupService : IHostedService
|
||||
{
|
||||
private readonly string _cacheDir;
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
private readonly bool _isMainServer;
|
||||
private readonly ILogger<FileCleanupService> _logger;
|
||||
private readonly MareMetrics _metrics;
|
||||
private readonly IServiceProvider _services;
|
||||
|
||||
private readonly string _hotStoragePath;
|
||||
private readonly string _coldStoragePath;
|
||||
private readonly bool _isMain = false;
|
||||
private readonly bool _isDistributionNode = false;
|
||||
private readonly bool _useColdStorage = false;
|
||||
private HashSet<string> _orphanedFiles = new(StringComparer.Ordinal);
|
||||
|
||||
private CancellationTokenSource _cleanupCts;
|
||||
|
||||
public FileCleanupService(MareMetrics metrics, ILogger<FileCleanupService> logger, IServiceProvider services, IConfigurationService<StaticFilesServerConfiguration> configuration)
|
||||
private int HotStorageMinimumRetention => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.MinimumFileRetentionPeriodInDays), 7);
|
||||
private int HotStorageRetention => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UnusedFileRetentionPeriodInDays), 14);
|
||||
private double HotStorageSize => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CacheSizeHardLimitInGiB), -1.0);
|
||||
|
||||
private int ColdStorageMinimumRetention => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.ColdStorageMinimumFileRetentionPeriodInDays), 60);
|
||||
private int ColdStorageRetention => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.ColdStorageUnusedFileRetentionPeriodInDays), 60);
|
||||
private double ColdStorageSize => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.ColdStorageSizeHardLimitInGiB), -1.0);
|
||||
|
||||
private double SmallSizeKiB => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CacheSmallSizeThresholdKiB), 64.0);
|
||||
private double LargeSizeKiB => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CacheLargeSizeThresholdKiB), 1024.0);
|
||||
|
||||
private int ForcedDeletionAfterHours => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.ForcedDeletionOfFilesAfterHours), -1);
|
||||
private int CleanupCheckMinutes => _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CleanupCheckInMinutes), 15);
|
||||
|
||||
private List<FileInfo> GetAllHotFiles() => new DirectoryInfo(_hotStoragePath).GetFiles("*", SearchOption.AllDirectories)
|
||||
.Where(f => f != null && f.Name.Length == 40)
|
||||
.OrderBy(f => f.LastAccessTimeUtc).ToList();
|
||||
|
||||
private List<FileInfo> GetAllColdFiles() => new DirectoryInfo(_coldStoragePath).GetFiles("*", SearchOption.AllDirectories)
|
||||
.Where(f => f != null && f.Name.Length == 40)
|
||||
.OrderBy(f => f.LastAccessTimeUtc).ToList();
|
||||
|
||||
private List<FileInfo> GetTempFiles() => new DirectoryInfo(_useColdStorage ? _coldStoragePath : _hotStoragePath).GetFiles("*", SearchOption.AllDirectories)
|
||||
.Where(f => f != null && (f.Name.EndsWith(".dl", StringComparison.InvariantCultureIgnoreCase) || f.Name.EndsWith(".tmp", StringComparison.InvariantCultureIgnoreCase))).ToList();
|
||||
|
||||
public FileCleanupService(MareMetrics metrics, ILogger<FileCleanupService> logger,
|
||||
IServiceProvider services, IConfigurationService<StaticFilesServerConfiguration> configuration)
|
||||
{
|
||||
_metrics = metrics;
|
||||
_logger = logger;
|
||||
_services = services;
|
||||
_configuration = configuration;
|
||||
_isMainServer = configuration.IsMain;
|
||||
_cacheDir = _configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
}
|
||||
|
||||
public async Task CleanUpTask(CancellationToken ct)
|
||||
{
|
||||
_logger.LogInformation("Starting periodic cleanup task");
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
DirectoryInfo dir = new(_cacheDir);
|
||||
var allFiles = dir.GetFiles("*", SearchOption.AllDirectories);
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotalSize, allFiles.Sum(f => f.Length));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotal, allFiles.Length);
|
||||
|
||||
using var scope = _services.CreateScope();
|
||||
using var dbContext = scope.ServiceProvider.GetService<MareDbContext>()!;
|
||||
|
||||
await CleanUpOutdatedFiles(dbContext, ct).ConfigureAwait(false);
|
||||
|
||||
CleanUpFilesBeyondSizeLimit(dbContext, ct);
|
||||
|
||||
if (_isMainServer)
|
||||
{
|
||||
CleanUpStuckUploads(dbContext);
|
||||
|
||||
await dbContext.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Error during cleanup task");
|
||||
}
|
||||
|
||||
var now = DateTime.Now;
|
||||
TimeOnly currentTime = new(now.Hour, now.Minute, now.Second);
|
||||
TimeOnly futureTime = new(now.Hour, now.Minute - now.Minute % 10, 0);
|
||||
var span = futureTime.AddMinutes(10) - currentTime;
|
||||
|
||||
_logger.LogInformation("File Cleanup Complete, next run at {date}", now.Add(span));
|
||||
await Task.Delay(span, ct).ConfigureAwait(false);
|
||||
}
|
||||
_useColdStorage = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
||||
_hotStoragePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
_coldStoragePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.ColdStorageDirectory));
|
||||
_isDistributionNode = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.IsDistributionNode), false);
|
||||
_isMain = configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress)) == null && _isDistributionNode;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_logger.LogInformation("Cleanup Service started");
|
||||
|
||||
InitializeGauges();
|
||||
|
||||
_cleanupCts = new();
|
||||
|
||||
_ = CleanUpTask(_cleanupCts.Token);
|
||||
@@ -88,145 +86,273 @@ public class FileCleanupService : IHostedService
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private void CleanUpFilesBeyondSizeLimit(MareDbContext dbContext, CancellationToken ct)
|
||||
private List<string> CleanUpFilesBeyondSizeLimit(List<FileInfo> files, double sizeLimit, double minTTL, double maxTTL, CancellationToken ct)
|
||||
{
|
||||
var sizeLimit = _configuration.GetValueOrDefault<double>(nameof(StaticFilesServerConfiguration.CacheSizeHardLimitInGiB), -1);
|
||||
var removedFiles = new List<string>();
|
||||
if (sizeLimit <= 0)
|
||||
{
|
||||
return;
|
||||
return removedFiles;
|
||||
}
|
||||
|
||||
var smallSize = SmallSizeKiB * 1024.0;
|
||||
var largeSize = LargeSizeKiB * 1024.0;
|
||||
var now = DateTime.Now;
|
||||
|
||||
// Avoid nonsense in future calculations
|
||||
if (smallSize < 0.0)
|
||||
smallSize = 0.0;
|
||||
|
||||
if (largeSize < smallSize)
|
||||
largeSize = smallSize;
|
||||
|
||||
if (minTTL < 0.0)
|
||||
minTTL = 0.0;
|
||||
|
||||
if (maxTTL < minTTL)
|
||||
maxTTL = minTTL;
|
||||
|
||||
// Calculates a deletion priority to prioritize deletion of larger files over a configured TTL range based on a file's size.
|
||||
// This is intended to be applied to the hot cache, as the cost of recovering many small files is greater than a single large file.
|
||||
// Example (minTTL=7, maxTTL=30):
|
||||
// - A 10MB file was last accessed 5 days ago. Its calculated optimum TTL is 7 days. result = 0.7143
|
||||
// - A 50kB file was last accessed 10 days ago. Its calculated optimum TTL is 30 days. result = 0.3333
|
||||
// The larger file will be deleted with a higher priority than the smaller file.
|
||||
double CalculateTTLProgression(FileInfo file)
|
||||
{
|
||||
var fileLength = (double)file.Length;
|
||||
var fileAgeDays = (now - file.LastAccessTime).TotalDays;
|
||||
var sizeNorm = Math.Clamp((fileLength - smallSize) / (largeSize - smallSize), 0.0, 1.0);
|
||||
// Using Math.Sqrt(sizeNorm) would create a more logical scaling curve, but it barely matters
|
||||
var ttlDayRange = (maxTTL - minTTL) * (1.0 - sizeNorm);
|
||||
var daysPastMinTTL = Math.Max(fileAgeDays - minTTL, 0.0);
|
||||
// There is some creativity in choosing an upper bound here:
|
||||
// - With no upper bound, any file larger than `largeSize` is always the highest priority for deletion once it passes its calculated TTL
|
||||
// - With 1.0 as an upper bound, all files older than `maxTTL` will have the same priority regardless of size
|
||||
// - Using maxTTL/minTTL chooses a logical cut-off point where any files old enough to be affected would have been cleaned up already
|
||||
var ttlProg = Math.Clamp(daysPastMinTTL / ttlDayRange, 0.0, maxTTL / minTTL);
|
||||
return ttlProg;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Since we already have the file list sorted by access time, the list index is incorporated in to
|
||||
// the dictionary key to preserve it as a secondary ordering
|
||||
var sortedFiles = new PriorityQueue<FileInfo, (double, int)>();
|
||||
|
||||
foreach (var (file, i) in files.Select((file, i) => ( file, i )))
|
||||
{
|
||||
double ttlProg = CalculateTTLProgression(file);
|
||||
sortedFiles.Enqueue(file, (-ttlProg, i));
|
||||
}
|
||||
|
||||
_logger.LogInformation("Cleaning up files beyond the cache size limit of {cacheSizeLimit} GiB", sizeLimit);
|
||||
var allLocalFiles = Directory.EnumerateFiles(_cacheDir, "*", SearchOption.AllDirectories)
|
||||
.Select(f => new FileInfo(f)).ToList()
|
||||
.OrderBy(f => f.LastAccessTimeUtc).ToList();
|
||||
var totalCacheSizeInBytes = allLocalFiles.Sum(s => s.Length);
|
||||
var totalCacheSizeInBytes = files.Sum(s => s.Length);
|
||||
long cacheSizeLimitInBytes = (long)ByteSize.FromGibiBytes(sizeLimit).Bytes;
|
||||
while (totalCacheSizeInBytes > cacheSizeLimitInBytes && allLocalFiles.Any() && !ct.IsCancellationRequested)
|
||||
while (totalCacheSizeInBytes > cacheSizeLimitInBytes && sortedFiles.Count != 0 && !ct.IsCancellationRequested)
|
||||
{
|
||||
var oldestFile = allLocalFiles[0];
|
||||
allLocalFiles.Remove(oldestFile);
|
||||
totalCacheSizeInBytes -= oldestFile.Length;
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, oldestFile.Length);
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_logger.LogInformation("Deleting {oldestFile} with size {size}MiB", oldestFile.FullName, ByteSize.FromBytes(oldestFile.Length).MebiBytes);
|
||||
oldestFile.Delete();
|
||||
if (_isMainServer)
|
||||
{
|
||||
FileCache f = new() { Hash = oldestFile.Name.ToUpperInvariant() };
|
||||
dbContext.Entry(f).State = EntityState.Deleted;
|
||||
}
|
||||
var file = sortedFiles.Dequeue();
|
||||
totalCacheSizeInBytes -= file.Length;
|
||||
_logger.LogInformation("Deleting {file} with size {size:N2}MiB", file.FullName, ByteSize.FromBytes(file.Length).MebiBytes);
|
||||
file.Delete();
|
||||
removedFiles.Add(file.Name);
|
||||
}
|
||||
files.RemoveAll(f => removedFiles.Contains(f.Name, StringComparer.InvariantCultureIgnoreCase));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error during cache size limit cleanup");
|
||||
}
|
||||
|
||||
return removedFiles;
|
||||
}
|
||||
|
||||
private void CleanUpOrphanedFiles(List<FileCache> allFiles, FileInfo[] allPhysicalFiles, CancellationToken ct)
|
||||
private void CleanUpOrphanedFiles(HashSet<string> allDbFileHashes, List<FileInfo> allPhysicalFiles, CancellationToken ct)
|
||||
{
|
||||
if (_isMainServer)
|
||||
// To avoid race conditions with file uploads, only delete files on a second pass
|
||||
var newOrphanedFiles = new HashSet<string>(StringComparer.Ordinal);
|
||||
|
||||
foreach (var file in allPhysicalFiles.ToList())
|
||||
{
|
||||
var allFilesHashes = new HashSet<string>(allFiles.Select(a => a.Hash.ToUpperInvariant()), StringComparer.Ordinal);
|
||||
foreach (var file in allPhysicalFiles)
|
||||
if (!allDbFileHashes.Contains(file.Name.ToUpperInvariant()))
|
||||
{
|
||||
if (!allFilesHashes.Contains(file.Name.ToUpperInvariant()))
|
||||
{
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
|
||||
file.Delete();
|
||||
_logger.LogInformation("File not in DB, deleting: {fileName}", file.Name);
|
||||
_logger.LogInformation("File not in DB, marking: {fileName}", file.Name);
|
||||
newOrphanedFiles.Add(file.FullName);
|
||||
}
|
||||
|
||||
ct.ThrowIfCancellationRequested();
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var fullName in _orphanedFiles.Where(f => newOrphanedFiles.Contains(f)))
|
||||
{
|
||||
var name = Path.GetFileName(fullName);
|
||||
File.Delete(fullName);
|
||||
_logger.LogInformation("File still not in DB, deleting: {fileName}", name);
|
||||
allPhysicalFiles.RemoveAll(f => f.FullName.Equals(fullName, StringComparison.InvariantCultureIgnoreCase));
|
||||
}
|
||||
|
||||
private async Task CleanUpOutdatedFiles(MareDbContext dbContext, CancellationToken ct)
|
||||
_orphanedFiles = newOrphanedFiles;
|
||||
}
|
||||
|
||||
private List<string> CleanUpOutdatedFiles(List<FileInfo> files, int unusedRetention, int forcedDeletionAfterHours, CancellationToken ct)
|
||||
{
|
||||
var removedFiles = new List<string>();
|
||||
try
|
||||
{
|
||||
var unusedRetention = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UnusedFileRetentionPeriodInDays), 14);
|
||||
var forcedDeletionAfterHours = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.ForcedDeletionOfFilesAfterHours), -1);
|
||||
|
||||
_logger.LogInformation("Cleaning up files older than {filesOlderThanDays} days", unusedRetention);
|
||||
if (forcedDeletionAfterHours > 0)
|
||||
{
|
||||
_logger.LogInformation("Cleaning up files written to longer than {hours}h ago", forcedDeletionAfterHours);
|
||||
}
|
||||
|
||||
// clean up files in DB but not on disk or last access is expired
|
||||
var prevTime = DateTime.Now.Subtract(TimeSpan.FromDays(unusedRetention));
|
||||
var prevTimeForcedDeletion = DateTime.Now.Subtract(TimeSpan.FromHours(forcedDeletionAfterHours));
|
||||
DirectoryInfo dir = new(_cacheDir);
|
||||
var allFilesInDir = dir.GetFiles("*", SearchOption.AllDirectories);
|
||||
var allFiles = await dbContext.Files.ToListAsync().ConfigureAwait(false);
|
||||
int fileCounter = 0;
|
||||
foreach (var fileCache in allFiles.Where(f => f.Uploaded))
|
||||
{
|
||||
var file = FilePathUtil.GetFileInfoForHash(_cacheDir, fileCache.Hash);
|
||||
bool fileDeleted = false;
|
||||
if (file == null && _isMainServer)
|
||||
{
|
||||
_logger.LogInformation("File does not exist anymore: {fileName}", fileCache.Hash);
|
||||
dbContext.Files.Remove(fileCache);
|
||||
fileDeleted = true;
|
||||
}
|
||||
else if (file != null && file.LastAccessTime < prevTime)
|
||||
{
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_logger.LogInformation("File outdated: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
|
||||
file.Delete();
|
||||
if (_isMainServer)
|
||||
{
|
||||
fileDeleted = true;
|
||||
dbContext.Files.Remove(fileCache);
|
||||
}
|
||||
}
|
||||
else if (file != null && forcedDeletionAfterHours > 0 && file.LastWriteTime < prevTimeForcedDeletion)
|
||||
{
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_logger.LogInformation("File forcefully deleted: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
|
||||
file.Delete();
|
||||
if (_isMainServer)
|
||||
{
|
||||
fileDeleted = true;
|
||||
dbContext.Files.Remove(fileCache);
|
||||
}
|
||||
}
|
||||
var lastAccessCutoffTime = DateTime.Now.Subtract(TimeSpan.FromDays(unusedRetention));
|
||||
var forcedDeletionCutoffTime = DateTime.Now.Subtract(TimeSpan.FromHours(forcedDeletionAfterHours));
|
||||
|
||||
if (_isMainServer && !fileDeleted && file != null && fileCache.Size == 0)
|
||||
foreach (var file in files)
|
||||
{
|
||||
_logger.LogInformation("Setting File Size of " + fileCache.Hash + " to " + file.Length);
|
||||
fileCache.Size = file.Length;
|
||||
// commit every 1000 files to db
|
||||
if (fileCounter % 1000 == 0) await dbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
if (file.LastAccessTime < lastAccessCutoffTime)
|
||||
{
|
||||
_logger.LogInformation("File outdated: {fileName}, {fileSize:N2}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
|
||||
file.Delete();
|
||||
removedFiles.Add(file.Name);
|
||||
}
|
||||
else if (forcedDeletionAfterHours > 0 && file.LastWriteTime < forcedDeletionCutoffTime)
|
||||
{
|
||||
_logger.LogInformation("File forcefully deleted: {fileName}, {fileSize:N2}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
|
||||
file.Delete();
|
||||
removedFiles.Add(file.Name);
|
||||
}
|
||||
|
||||
fileCounter++;
|
||||
|
||||
ct.ThrowIfCancellationRequested();
|
||||
}
|
||||
|
||||
// clean up files that are on disk but not in DB for some reason
|
||||
CleanUpOrphanedFiles(allFiles, allFilesInDir, ct);
|
||||
files.RemoveAll(f => removedFiles.Contains(f.Name, StringComparer.InvariantCultureIgnoreCase));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error during file cleanup of old files");
|
||||
}
|
||||
|
||||
return removedFiles;
|
||||
}
|
||||
|
||||
private void CleanUpStuckUploads(MareDbContext dbContext)
|
||||
private void CleanUpTempFiles()
|
||||
{
|
||||
var pastTime = DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(10));
|
||||
var stuckUploads = dbContext.Files.Where(f => !f.Uploaded && f.UploadDate < pastTime);
|
||||
dbContext.Files.RemoveRange(stuckUploads);
|
||||
var pastTime = DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(20));
|
||||
var tempFiles = GetTempFiles();
|
||||
foreach (var tempFile in tempFiles.Where(f => f.LastWriteTimeUtc < pastTime))
|
||||
tempFile.Delete();
|
||||
}
|
||||
|
||||
private async Task CleanUpTask(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var scope = _services.CreateScope();
|
||||
using var dbContext = _isMain ? scope.ServiceProvider.GetService<MareDbContext>()! : null;
|
||||
|
||||
HashSet<string> allDbFileHashes = null;
|
||||
|
||||
// Database operations only performed on main server
|
||||
if (_isMain)
|
||||
{
|
||||
var allDbFiles = await dbContext.Files.ToListAsync(ct).ConfigureAwait(false);
|
||||
allDbFileHashes = new HashSet<string>(allDbFiles.Select(a => a.Hash.ToUpperInvariant()), StringComparer.Ordinal);
|
||||
}
|
||||
|
||||
if (_useColdStorage)
|
||||
{
|
||||
var coldFiles = GetAllColdFiles();
|
||||
var removedColdFiles = new List<string>();
|
||||
|
||||
removedColdFiles.AddRange(
|
||||
CleanUpOutdatedFiles(coldFiles, ColdStorageRetention, ForcedDeletionAfterHours, ct)
|
||||
);
|
||||
removedColdFiles.AddRange(
|
||||
CleanUpFilesBeyondSizeLimit(coldFiles, ColdStorageSize, ColdStorageMinimumRetention, ColdStorageRetention, ct)
|
||||
);
|
||||
|
||||
// Remove cold storage files are deleted from the database, if we are the main file server
|
||||
if (_isMain)
|
||||
{
|
||||
dbContext.Files.RemoveRange(
|
||||
dbContext.Files.Where(f => removedColdFiles.Contains(f.Hash))
|
||||
);
|
||||
allDbFileHashes.ExceptWith(removedColdFiles);
|
||||
CleanUpOrphanedFiles(allDbFileHashes, coldFiles, ct);
|
||||
}
|
||||
|
||||
// Remove hot copies of files now that the authoritative copy is gone
|
||||
foreach (var removedFile in removedColdFiles)
|
||||
{
|
||||
var hotFile = FilePathUtil.GetFileInfoForHash(_hotStoragePath, removedFile);
|
||||
hotFile?.Delete();
|
||||
}
|
||||
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotalSizeColdStorage, coldFiles.Sum(f => { try { return f.Length; } catch { return 0; } }));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotalColdStorage, coldFiles.Count);
|
||||
}
|
||||
|
||||
var hotFiles = GetAllHotFiles();
|
||||
var removedHotFiles = new List<string>();
|
||||
|
||||
removedHotFiles.AddRange(
|
||||
CleanUpOutdatedFiles(hotFiles, HotStorageRetention, forcedDeletionAfterHours: _useColdStorage ? ForcedDeletionAfterHours : -1, ct)
|
||||
);
|
||||
removedHotFiles.AddRange(
|
||||
CleanUpFilesBeyondSizeLimit(hotFiles, HotStorageSize, HotStorageMinimumRetention, HotStorageRetention, ct)
|
||||
);
|
||||
|
||||
if (_isMain)
|
||||
{
|
||||
// If cold storage is not active, then "hot" files are deleted from the database instead
|
||||
if (!_useColdStorage)
|
||||
{
|
||||
dbContext.Files.RemoveRange(
|
||||
dbContext.Files.Where(f => removedHotFiles.Contains(f.Hash))
|
||||
);
|
||||
allDbFileHashes.ExceptWith(removedHotFiles);
|
||||
}
|
||||
|
||||
CleanUpOrphanedFiles(allDbFileHashes, hotFiles, ct);
|
||||
|
||||
await dbContext.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotalSize, hotFiles.Sum(f => { try { return f.Length; } catch { return 0; } }));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotal, hotFiles.Count);
|
||||
|
||||
CleanUpTempFiles();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Error during cleanup task");
|
||||
}
|
||||
|
||||
var cleanupCheckMinutes = CleanupCheckMinutes;
|
||||
var now = DateTime.Now;
|
||||
TimeOnly currentTime = new(now.Hour, now.Minute, now.Second);
|
||||
TimeOnly futureTime = new(now.Hour, now.Minute - now.Minute % cleanupCheckMinutes, 0);
|
||||
var span = futureTime.AddMinutes(cleanupCheckMinutes) - currentTime;
|
||||
|
||||
_logger.LogInformation("File Cleanup Complete, next run at {date}", now.Add(span));
|
||||
await Task.Delay(span, ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void InitializeGauges()
|
||||
{
|
||||
if (_useColdStorage)
|
||||
{
|
||||
var allFilesInColdStorageDir = GetAllColdFiles();
|
||||
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotalSizeColdStorage, allFilesInColdStorageDir.Sum(f => f.Length));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotalColdStorage, allFilesInColdStorageDir.Count);
|
||||
}
|
||||
|
||||
var allFilesInHotStorage = GetAllHotFiles();
|
||||
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotalSize, allFilesInHotStorage.Sum(f => { try { return f.Length; } catch { return 0; } }));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotal, allFilesInHotStorage.Count);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public interface IClientReadyMessageService
|
||||
{
|
||||
void SendDownloadReady(string uid, Guid requestId);
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public interface ITouchHashService : IHostedService
|
||||
{
|
||||
void TouchColdHash(string hash);
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using MareSynchronos.API.SignalR;
|
||||
using MareSynchronosServer.Hubs;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public class MainClientReadyMessageService : IClientReadyMessageService
|
||||
{
|
||||
private readonly ILogger<MainClientReadyMessageService> _logger;
|
||||
private readonly IHubContext<MareHub> _mareHub;
|
||||
|
||||
public MainClientReadyMessageService(ILogger<MainClientReadyMessageService> logger, IHubContext<MareHub> mareHub)
|
||||
{
|
||||
_logger = logger;
|
||||
_mareHub = mareHub;
|
||||
}
|
||||
|
||||
public void SendDownloadReady(string uid, Guid requestId)
|
||||
{
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
_logger.LogDebug("Sending Client Ready for {uid}:{requestId} to SignalR", uid, requestId);
|
||||
await _mareHub.Clients.User(uid).SendAsync(nameof(IMareHub.Client_DownloadReady), requestId).ConfigureAwait(false);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,37 +1,29 @@
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosStaticFilesServer.Utils;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Timers;
|
||||
using MareSynchronos.API.SignalR;
|
||||
using MareSynchronosShared.Data;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using System.Linq;
|
||||
using System.Timers;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public class RequestQueueService : IHostedService
|
||||
{
|
||||
private record PriorityEntry(bool IsHighPriority, DateTime LastChecked);
|
||||
|
||||
private readonly IHubContext<MareSynchronosServer.Hubs.MareHub> _hubContext;
|
||||
private readonly IClientReadyMessageService _clientReadyMessageService;
|
||||
private readonly CachedFileProvider _cachedFileProvider;
|
||||
private readonly ILogger<RequestQueueService> _logger;
|
||||
private readonly MareMetrics _metrics;
|
||||
private readonly ConcurrentQueue<UserRequest> _queue = new();
|
||||
private readonly ConcurrentQueue<UserRequest> _priorityQueue = new();
|
||||
private readonly int _queueExpirationSeconds;
|
||||
private readonly SemaphoreSlim _queueProcessingSemaphore = new(1);
|
||||
private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new();
|
||||
private readonly SemaphoreSlim _queueSemaphore = new(1);
|
||||
private readonly UserQueueEntry[] _userQueueRequests;
|
||||
private readonly ConcurrentDictionary<string, PriorityEntry> _priorityCache = new(StringComparer.Ordinal);
|
||||
private int _queueLimitForReset;
|
||||
private readonly int _queueReleaseSeconds;
|
||||
private System.Timers.Timer _queueTimer;
|
||||
|
||||
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService,
|
||||
ILogger<RequestQueueService> logger, IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext)
|
||||
ILogger<RequestQueueService> logger, IClientReadyMessageService hubContext, CachedFileProvider cachedFileProvider)
|
||||
{
|
||||
_userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)];
|
||||
_queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5);
|
||||
@@ -39,7 +31,8 @@ public class RequestQueueService : IHostedService
|
||||
_queueReleaseSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueReleaseSeconds), 15);
|
||||
_metrics = metrics;
|
||||
_logger = logger;
|
||||
_hubContext = hubContext;
|
||||
_clientReadyMessageService = hubContext;
|
||||
_cachedFileProvider = cachedFileProvider;
|
||||
}
|
||||
|
||||
public void ActivateRequest(Guid request)
|
||||
@@ -49,50 +42,16 @@ public class RequestQueueService : IHostedService
|
||||
req.MarkActive();
|
||||
}
|
||||
|
||||
private async Task<bool> IsHighPriority(string uid, MareDbContext mareDbContext)
|
||||
public async Task EnqueueUser(UserRequest request, bool isPriority, CancellationToken token)
|
||||
{
|
||||
return false;
|
||||
if (!_priorityCache.TryGetValue(uid, out PriorityEntry entry) || entry.LastChecked.Add(TimeSpan.FromHours(6)) < DateTime.UtcNow)
|
||||
while (_queueProcessingSemaphore.CurrentCount == 0)
|
||||
{
|
||||
var user = await mareDbContext.Users.FirstOrDefaultAsync(u => u.UID == uid).ConfigureAwait(false);
|
||||
entry = new(user != null && !string.IsNullOrEmpty(user.Alias), DateTime.UtcNow);
|
||||
_priorityCache[uid] = entry;
|
||||
await Task.Delay(50, token).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return entry.IsHighPriority;
|
||||
}
|
||||
|
||||
public async Task EnqueueUser(UserRequest request, MareDbContext mareDbContext)
|
||||
{
|
||||
_logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds));
|
||||
|
||||
bool isPriorityQueue = await IsHighPriority(request.User, mareDbContext).ConfigureAwait(false);
|
||||
|
||||
if (_queueProcessingSemaphore.CurrentCount == 0)
|
||||
{
|
||||
if (isPriorityQueue) _priorityQueue.Enqueue(request);
|
||||
else _queue.Enqueue(request);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await _queueSemaphore.WaitAsync().ConfigureAwait(false);
|
||||
if (isPriorityQueue) _priorityQueue.Enqueue(request);
|
||||
else _queue.Enqueue(request);
|
||||
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error during EnqueueUser");
|
||||
}
|
||||
finally
|
||||
{
|
||||
_queueSemaphore.Release();
|
||||
}
|
||||
|
||||
throw new Exception("Error during EnqueueUser");
|
||||
GetQueue(isPriority).Enqueue(request);
|
||||
}
|
||||
|
||||
public void FinishRequest(Guid request)
|
||||
@@ -117,9 +76,10 @@ public class RequestQueueService : IHostedService
|
||||
return userQueueRequest != null && userRequest != null && userQueueRequest.ExpirationDate > DateTime.UtcNow;
|
||||
}
|
||||
|
||||
public void RemoveFromQueue(Guid requestId, string user)
|
||||
public void RemoveFromQueue(Guid requestId, string user, bool isPriority)
|
||||
{
|
||||
if (!_queue.Any(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal)))
|
||||
var existingRequest = GetQueue(isPriority).FirstOrDefault(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal));
|
||||
if (existingRequest == null)
|
||||
{
|
||||
var activeSlot = _userQueueRequests.FirstOrDefault(r => r != null && string.Equals(r.UserRequest.User, user, StringComparison.Ordinal) && r.UserRequest.RequestId == requestId);
|
||||
if (activeSlot != null)
|
||||
@@ -130,29 +90,27 @@ public class RequestQueueService : IHostedService
|
||||
_userQueueRequests[idx] = null;
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
_queueRemoval[requestId] = user;
|
||||
else
|
||||
{
|
||||
existingRequest.IsCancelled = true;
|
||||
}
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_queueTimer = new System.Timers.Timer(250);
|
||||
_queueTimer = new System.Timers.Timer(500);
|
||||
_queueTimer.Elapsed += ProcessQueue;
|
||||
_queueTimer.AutoReset = true;
|
||||
_queueTimer.Start();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task<bool> StillEnqueued(Guid request, string user, MareDbContext mareDbContext)
|
||||
private ConcurrentQueue<UserRequest> GetQueue(bool isPriority) => isPriority ? _priorityQueue : _queue;
|
||||
|
||||
public bool StillEnqueued(Guid request, string user, bool isPriority)
|
||||
{
|
||||
bool isPriorityQueue = await IsHighPriority(user, mareDbContext).ConfigureAwait(false);
|
||||
if (isPriorityQueue)
|
||||
{
|
||||
return _priorityQueue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
|
||||
}
|
||||
return _queue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
|
||||
return GetQueue(isPriority).Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
@@ -161,92 +119,88 @@ public class RequestQueueService : IHostedService
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task DequeueIntoSlotAsync(UserRequest userRequest, int slot)
|
||||
private void DequeueIntoSlot(UserRequest userRequest, int slot)
|
||||
{
|
||||
_logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, string.Join(", ", userRequest.FileIds));
|
||||
_userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds));
|
||||
await _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false);
|
||||
_clientReadyMessageService.SendDownloadReady(userRequest.User, userRequest.RequestId);
|
||||
}
|
||||
|
||||
private async void ProcessQueue(object src, ElapsedEventArgs e)
|
||||
private void ProcessQueue(object src, ElapsedEventArgs e)
|
||||
{
|
||||
if (_queueProcessingSemaphore.CurrentCount == 0) return;
|
||||
|
||||
await _queueProcessingSemaphore.WaitAsync().ConfigureAwait(false);
|
||||
_queueProcessingSemaphore.Wait();
|
||||
|
||||
try
|
||||
{
|
||||
if (_queue.Count > _queueLimitForReset)
|
||||
if (_queue.Count(c => !c.IsCancelled) > _queueLimitForReset)
|
||||
{
|
||||
_queue.Clear();
|
||||
return;
|
||||
}
|
||||
|
||||
Parallel.For(0, _userQueueRequests.Length, new ParallelOptions()
|
||||
{
|
||||
MaxDegreeOfParallelism = 10,
|
||||
},
|
||||
async (i) =>
|
||||
for (int i = 0; i < _userQueueRequests.Length; i++)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_userQueueRequests[i] != null && ((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow)))
|
||||
if (_userQueueRequests[i] != null
|
||||
&& (((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow))
|
||||
|| (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds))))
|
||||
)
|
||||
{
|
||||
_logger.LogDebug("Expiring inactive request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
|
||||
_logger.LogDebug("Expiring request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
|
||||
_userQueueRequests[i] = null;
|
||||
}
|
||||
|
||||
if (_userQueueRequests[i] != null && (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds))))
|
||||
{
|
||||
_logger.LogDebug("Expiring active request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
|
||||
_userQueueRequests[i] = null;
|
||||
}
|
||||
if (_userQueueRequests[i] != null) continue;
|
||||
|
||||
if (!_queue.Any()) return;
|
||||
|
||||
if (_userQueueRequests[i] == null)
|
||||
while (true)
|
||||
{
|
||||
bool enqueued = false;
|
||||
while (!enqueued)
|
||||
if (!_priorityQueue.All(u => _cachedFileProvider.AnyFilesDownloading(u.FileIds))
|
||||
&& _priorityQueue.TryDequeue(out var prioRequest))
|
||||
{
|
||||
if (_priorityQueue.TryDequeue(out var prioRequest))
|
||||
if (prioRequest.IsCancelled)
|
||||
{
|
||||
if (_queueRemoval.TryGetValue(prioRequest.RequestId, out string user) && string.Equals(user, prioRequest.User, StringComparison.Ordinal))
|
||||
{
|
||||
_logger.LogDebug("Request cancelled: {requestId} by {user}", prioRequest.RequestId, user);
|
||||
_queueRemoval.Remove(prioRequest.RequestId, out _);
|
||||
continue;
|
||||
}
|
||||
|
||||
await DequeueIntoSlotAsync(prioRequest, i).ConfigureAwait(false);
|
||||
enqueued = true;
|
||||
if (_cachedFileProvider.AnyFilesDownloading(prioRequest.FileIds))
|
||||
{
|
||||
_priorityQueue.Enqueue(prioRequest);
|
||||
continue;
|
||||
}
|
||||
|
||||
DequeueIntoSlot(prioRequest, i);
|
||||
break;
|
||||
}
|
||||
|
||||
if (_queue.TryDequeue(out var request))
|
||||
if (!_queue.All(u => _cachedFileProvider.AnyFilesDownloading(u.FileIds))
|
||||
&& _queue.TryDequeue(out var request))
|
||||
{
|
||||
if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal))
|
||||
if (request.IsCancelled)
|
||||
{
|
||||
_logger.LogDebug("Request cancelled: {requestId} by {user}", request.RequestId, user);
|
||||
_queueRemoval.Remove(request.RequestId, out _);
|
||||
continue;
|
||||
}
|
||||
|
||||
await DequeueIntoSlotAsync(request, i).ConfigureAwait(false);
|
||||
enqueued = true;
|
||||
}
|
||||
else
|
||||
if (_cachedFileProvider.AnyFilesDownloading(request.FileIds))
|
||||
{
|
||||
enqueued = true;
|
||||
_queue.Enqueue(request);
|
||||
continue;
|
||||
}
|
||||
|
||||
DequeueIntoSlot(request, i);
|
||||
break;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error during inside queue processing");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -260,6 +214,9 @@ public class RequestQueueService : IHostedService
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueFree, _userQueueRequests.Count(c => c == null));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueActive, _userQueueRequests.Count(c => c != null && c.IsActive));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueInactive, _userQueueRequests.Count(c => c != null && !c.IsActive));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count);
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count(q => !q.IsCancelled));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueueCancelled, _queue.Count(q => q.IsCancelled));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueue, _priorityQueue.Count(q => !q.IsCancelled));
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueueCancelled, _priorityQueue.Count(q => q.IsCancelled));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosShared.Utils;
|
||||
using System.Net.Http.Headers;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public class ShardClientReadyMessageService : IClientReadyMessageService
|
||||
{
|
||||
private readonly ILogger<ShardClientReadyMessageService> _logger;
|
||||
private readonly ServerTokenGenerator _tokenGenerator;
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configurationService;
|
||||
private readonly HttpClient _httpClient;
|
||||
|
||||
public ShardClientReadyMessageService(ILogger<ShardClientReadyMessageService> logger, ServerTokenGenerator tokenGenerator, IConfigurationService<StaticFilesServerConfiguration> configurationService)
|
||||
{
|
||||
_logger = logger;
|
||||
_tokenGenerator = tokenGenerator;
|
||||
_configurationService = configurationService;
|
||||
_httpClient = new();
|
||||
_httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("MareSynchronosServer", "1.0.0.0"));
|
||||
}
|
||||
|
||||
public void SendDownloadReady(string uid, Guid requestId)
|
||||
{
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
var mainUrlConfigKey = _configurationService.GetValue<bool>(nameof(StaticFilesServerConfiguration.NotifyMainServerDirectly))
|
||||
? nameof(StaticFilesServerConfiguration.MainServerAddress)
|
||||
: nameof(StaticFilesServerConfiguration.MainFileServerAddress);
|
||||
var mainUrl = _configurationService.GetValue<Uri>(mainUrlConfigKey);
|
||||
var path = MareFiles.MainSendReadyFullPath(mainUrl, uid, requestId);
|
||||
using HttpRequestMessage msg = new()
|
||||
{
|
||||
RequestUri = path
|
||||
};
|
||||
msg.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _tokenGenerator.Token);
|
||||
|
||||
_logger.LogDebug("Sending Client Ready for {uid}:{requestId} to {path}", uid, requestId, path);
|
||||
try
|
||||
{
|
||||
using var result = await _httpClient.SendAsync(msg).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failure to send for {uid}:{requestId}", uid, requestId);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosShared.Utils;
|
||||
using System.Net.Http.Headers;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
// Notify distribution server of file hashes downloaded via shards, so they are not prematurely purged from its cold cache
|
||||
public class ShardTouchMessageService : ITouchHashService
|
||||
{
|
||||
private readonly ILogger<ShardTouchMessageService> _logger;
|
||||
private readonly ServerTokenGenerator _tokenGenerator;
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
private readonly HttpClient _httpClient;
|
||||
private readonly Uri _remoteCacheSourceUri;
|
||||
private readonly HashSet<string> _touchHashSet = new();
|
||||
private readonly ColdTouchHashService _nestedService = null;
|
||||
|
||||
private CancellationTokenSource _touchmsgCts;
|
||||
|
||||
public ShardTouchMessageService(ILogger<ShardTouchMessageService> logger, ILogger<ColdTouchHashService> nestedLogger,
|
||||
ServerTokenGenerator tokenGenerator, IConfigurationService<StaticFilesServerConfiguration> configuration)
|
||||
{
|
||||
_logger = logger;
|
||||
_tokenGenerator = tokenGenerator;
|
||||
_configuration = configuration;
|
||||
_remoteCacheSourceUri = _configuration.GetValueOrDefault<Uri>(nameof(StaticFilesServerConfiguration.DistributionFileServerAddress), null);
|
||||
_httpClient = new();
|
||||
_httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("MareSynchronosServer", "1.0.0.0"));
|
||||
|
||||
if (configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false))
|
||||
{
|
||||
_nestedService = new ColdTouchHashService(nestedLogger, configuration);
|
||||
}
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_remoteCacheSourceUri == null)
|
||||
return Task.CompletedTask;
|
||||
|
||||
_logger.LogInformation("Touch Message Service started");
|
||||
|
||||
_touchmsgCts = new();
|
||||
|
||||
_ = TouchMessageTask(_touchmsgCts.Token);
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_remoteCacheSourceUri == null)
|
||||
return Task.CompletedTask;
|
||||
|
||||
_touchmsgCts.Cancel();
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task SendTouches(IEnumerable<string> hashes)
|
||||
{
|
||||
var mainUrl = _remoteCacheSourceUri;
|
||||
var path = new Uri(mainUrl, MareFiles.Distribution + "/touch");
|
||||
using HttpRequestMessage msg = new()
|
||||
{
|
||||
RequestUri = path
|
||||
};
|
||||
msg.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _tokenGenerator.Token);
|
||||
msg.Method = HttpMethod.Post;
|
||||
msg.Content = JsonContent.Create(hashes);
|
||||
if (_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DistributionFileServerForceHTTP2), false))
|
||||
{
|
||||
msg.Version = new Version(2, 0);
|
||||
msg.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
|
||||
}
|
||||
|
||||
_logger.LogDebug("Sending remote touch to {path}", path);
|
||||
try
|
||||
{
|
||||
using var result = await _httpClient.SendAsync(msg).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failure to send touches for {hashChunk}", hashes);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task TouchMessageTask(CancellationToken ct)
|
||||
{
|
||||
List<string> hashes;
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
lock (_touchHashSet)
|
||||
{
|
||||
hashes = _touchHashSet.ToList();
|
||||
_touchHashSet.Clear();
|
||||
}
|
||||
if (hashes.Count > 0)
|
||||
await SendTouches(hashes);
|
||||
await Task.Delay(TimeSpan.FromSeconds(30), ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Error during touch message task");
|
||||
}
|
||||
}
|
||||
|
||||
lock (_touchHashSet)
|
||||
{
|
||||
hashes = _touchHashSet.ToList();
|
||||
_touchHashSet.Clear();
|
||||
}
|
||||
if (hashes.Count > 0)
|
||||
await SendTouches(hashes);
|
||||
}
|
||||
|
||||
public void TouchColdHash(string hash)
|
||||
{
|
||||
if (_nestedService != null)
|
||||
_nestedService.TouchColdHash(hash);
|
||||
|
||||
lock (_touchHashSet)
|
||||
{
|
||||
_touchHashSet.Add(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,7 @@ namespace MareSynchronosStaticFilesServer;
|
||||
public class Startup
|
||||
{
|
||||
private bool _isMain;
|
||||
private bool _isDistributionNode;
|
||||
private readonly ILogger<Startup> _logger;
|
||||
|
||||
public Startup(IConfiguration configuration, ILogger<Startup> logger)
|
||||
@@ -33,7 +34,8 @@ public class Startup
|
||||
Configuration = configuration;
|
||||
_logger = logger;
|
||||
var mareSettings = Configuration.GetRequiredSection("MareSynchronos");
|
||||
_isMain = string.IsNullOrEmpty(mareSettings.GetValue(nameof(StaticFilesServerConfiguration.MainFileServerAddress), string.Empty));
|
||||
_isDistributionNode = mareSettings.GetValue(nameof(StaticFilesServerConfiguration.IsDistributionNode), false);
|
||||
_isMain = string.IsNullOrEmpty(mareSettings.GetValue(nameof(StaticFilesServerConfiguration.MainFileServerAddress), string.Empty)) && _isDistributionNode;
|
||||
}
|
||||
|
||||
public IConfiguration Configuration { get; }
|
||||
@@ -51,12 +53,15 @@ public class Startup
|
||||
|
||||
var mareConfig = Configuration.GetRequiredSection("MareSynchronos");
|
||||
|
||||
// metrics configuration
|
||||
services.AddSingleton(m => new MareMetrics(m.GetService<ILogger<MareMetrics>>(), new List<string>
|
||||
{
|
||||
MetricsAPI.CounterFileRequests,
|
||||
MetricsAPI.CounterFileRequestSize
|
||||
}, new List<string>
|
||||
{
|
||||
MetricsAPI.GaugeFilesTotalColdStorage,
|
||||
MetricsAPI.GaugeFilesTotalSizeColdStorage,
|
||||
MetricsAPI.GaugeFilesTotalSize,
|
||||
MetricsAPI.GaugeFilesTotal,
|
||||
MetricsAPI.GaugeFilesUniquePastDay,
|
||||
@@ -65,17 +70,33 @@ public class Startup
|
||||
MetricsAPI.GaugeFilesUniquePastHourSize,
|
||||
MetricsAPI.GaugeCurrentDownloads,
|
||||
MetricsAPI.GaugeDownloadQueue,
|
||||
MetricsAPI.GaugeDownloadQueueCancelled,
|
||||
MetricsAPI.GaugeDownloadPriorityQueue,
|
||||
MetricsAPI.GaugeDownloadPriorityQueueCancelled,
|
||||
MetricsAPI.GaugeQueueFree,
|
||||
MetricsAPI.GaugeQueueInactive,
|
||||
MetricsAPI.GaugeQueueActive,
|
||||
MetricsAPI.GaugeFilesDownloadingFromCache,
|
||||
MetricsAPI.GaugeFilesTasksWaitingForDownloadFromCache
|
||||
}));
|
||||
|
||||
// generic services
|
||||
services.AddSingleton<CachedFileProvider>();
|
||||
services.AddHostedService<FileCleanupService>();
|
||||
services.AddSingleton<FileStatisticsService>();
|
||||
services.AddSingleton<RequestFileStreamResultFactory>();
|
||||
|
||||
services.AddSingleton<ServerTokenGenerator>();
|
||||
services.AddSingleton<RequestQueueService>();
|
||||
services.AddHostedService(p => p.GetService<RequestQueueService>());
|
||||
services.AddHostedService(m => m.GetService<FileStatisticsService>());
|
||||
services.AddHostedService<FileCleanupService>();
|
||||
services.AddSingleton<IConfigurationService<MareConfigurationAuthBase>, MareConfigurationServiceClient<MareConfigurationAuthBase>>();
|
||||
services.AddHostedService(p => (MareConfigurationServiceClient<MareConfigurationAuthBase>)p.GetService<IConfigurationService<MareConfigurationAuthBase>>());
|
||||
|
||||
// specific services
|
||||
if (_isMain)
|
||||
{
|
||||
services.AddSingleton<IClientReadyMessageService, MainClientReadyMessageService>();
|
||||
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, MareConfigurationServiceServer<StaticFilesServerConfiguration>>();
|
||||
services.AddDbContextPool<MareDbContext>(options =>
|
||||
{
|
||||
options.UseNpgsql(Configuration.GetConnectionString("DefaultConnection"), builder =>
|
||||
@@ -85,63 +106,6 @@ public class Startup
|
||||
options.EnableThreadSafetyChecks(false);
|
||||
}, mareConfig.GetValue(nameof(MareConfigurationBase.DbContextPoolSize), 1024));
|
||||
|
||||
services.AddOptions<JwtBearerOptions>(JwtBearerDefaults.AuthenticationScheme)
|
||||
.Configure<IConfigurationService<MareConfigurationAuthBase>>((o, s) =>
|
||||
{
|
||||
o.TokenValidationParameters = new()
|
||||
{
|
||||
ValidateIssuer = false,
|
||||
ValidateLifetime = false,
|
||||
ValidateAudience = false,
|
||||
ValidateIssuerSigningKey = true,
|
||||
IssuerSigningKey = new SymmetricSecurityKey(Encoding.ASCII.GetBytes(s.GetValue<string>(nameof(MareConfigurationAuthBase.Jwt)))),
|
||||
};
|
||||
});
|
||||
|
||||
services.AddAuthentication(o =>
|
||||
{
|
||||
o.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
|
||||
o.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
|
||||
o.DefaultScheme = JwtBearerDefaults.AuthenticationScheme;
|
||||
}).AddJwtBearer();
|
||||
|
||||
services.AddAuthorization(options =>
|
||||
{
|
||||
options.FallbackPolicy = new AuthorizationPolicyBuilder().RequireAuthenticatedUser().Build();
|
||||
options.AddPolicy("Internal", new AuthorizationPolicyBuilder().RequireClaim(MareClaimTypes.Internal, "true").Build());
|
||||
});
|
||||
|
||||
if (_isMain)
|
||||
{
|
||||
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, MareConfigurationServiceServer<StaticFilesServerConfiguration>>();
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, MareConfigurationServiceClient<StaticFilesServerConfiguration>>();
|
||||
services.AddHostedService(p => (MareConfigurationServiceClient<StaticFilesServerConfiguration>)p.GetService<IConfigurationService<StaticFilesServerConfiguration>>());
|
||||
}
|
||||
|
||||
services.AddSingleton<IConfigurationService<MareConfigurationAuthBase>, MareConfigurationServiceClient<MareConfigurationAuthBase>>();
|
||||
|
||||
services.AddSingleton<ServerTokenGenerator>();
|
||||
services.AddSingleton<RequestQueueService>();
|
||||
services.AddHostedService(p => p.GetService<RequestQueueService>());
|
||||
services.AddControllers().ConfigureApplicationPartManager(a =>
|
||||
{
|
||||
a.FeatureProviders.Remove(a.FeatureProviders.OfType<ControllerFeatureProvider>().First());
|
||||
if (_isMain)
|
||||
{
|
||||
a.FeatureProviders.Add(new AllowedControllersFeatureProvider(typeof(MareStaticFilesServerConfigurationController), typeof(CacheController), typeof(RequestController), typeof(ServerFilesController)));
|
||||
}
|
||||
else
|
||||
{
|
||||
a.FeatureProviders.Add(new AllowedControllersFeatureProvider(typeof(CacheController), typeof(RequestController)));
|
||||
}
|
||||
});
|
||||
|
||||
services.AddHostedService(p => (MareConfigurationServiceClient<MareConfigurationAuthBase>)p.GetService<IConfigurationService<MareConfigurationAuthBase>>());
|
||||
|
||||
services.AddSingleton<IUserIdProvider, IdBasedUserIdProvider>();
|
||||
var signalRServiceBuilder = services.AddSignalR(hubOptions =>
|
||||
{
|
||||
hubOptions.MaximumReceiveMessageSize = long.MaxValue;
|
||||
@@ -203,6 +167,70 @@ public class Startup
|
||||
};
|
||||
|
||||
services.AddStackExchangeRedisExtensions<SystemTextJsonSerializer>(redisConfiguration);
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddSingleton<IClientReadyMessageService, ShardClientReadyMessageService>();
|
||||
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, MareConfigurationServiceClient<StaticFilesServerConfiguration>>();
|
||||
services.AddHostedService(p => (MareConfigurationServiceClient<StaticFilesServerConfiguration>)p.GetService<IConfigurationService<StaticFilesServerConfiguration>>());
|
||||
}
|
||||
|
||||
if (_isDistributionNode)
|
||||
{
|
||||
services.AddSingleton<ITouchHashService, ColdTouchHashService>();
|
||||
services.AddHostedService(p => p.GetService<ITouchHashService>());
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddSingleton<ITouchHashService, ShardTouchMessageService>();
|
||||
services.AddHostedService(p => p.GetService<ITouchHashService>());
|
||||
}
|
||||
|
||||
// controller setup
|
||||
services.AddControllers().ConfigureApplicationPartManager(a =>
|
||||
{
|
||||
a.FeatureProviders.Remove(a.FeatureProviders.OfType<ControllerFeatureProvider>().First());
|
||||
if (_isMain)
|
||||
{
|
||||
a.FeatureProviders.Add(new AllowedControllersFeatureProvider(typeof(MareStaticFilesServerConfigurationController),
|
||||
typeof(CacheController), typeof(RequestController), typeof(ServerFilesController),
|
||||
typeof(DistributionController), typeof(MainController)));
|
||||
}
|
||||
else if (_isDistributionNode)
|
||||
{
|
||||
a.FeatureProviders.Add(new AllowedControllersFeatureProvider(typeof(CacheController), typeof(RequestController), typeof(DistributionController)));
|
||||
}
|
||||
else
|
||||
{
|
||||
a.FeatureProviders.Add(new AllowedControllersFeatureProvider(typeof(CacheController), typeof(RequestController)));
|
||||
}
|
||||
});
|
||||
|
||||
// authentication and authorization
|
||||
services.AddOptions<JwtBearerOptions>(JwtBearerDefaults.AuthenticationScheme)
|
||||
.Configure<IConfigurationService<MareConfigurationAuthBase>>((o, s) =>
|
||||
{
|
||||
o.TokenValidationParameters = new()
|
||||
{
|
||||
ValidateIssuer = false,
|
||||
ValidateLifetime = false,
|
||||
ValidateAudience = false,
|
||||
ValidateIssuerSigningKey = true,
|
||||
IssuerSigningKey = new SymmetricSecurityKey(Encoding.ASCII.GetBytes(s.GetValue<string>(nameof(MareConfigurationAuthBase.Jwt))))
|
||||
};
|
||||
});
|
||||
services.AddAuthentication(o =>
|
||||
{
|
||||
o.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme;
|
||||
o.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme;
|
||||
o.DefaultScheme = JwtBearerDefaults.AuthenticationScheme;
|
||||
}).AddJwtBearer();
|
||||
services.AddAuthorization(options =>
|
||||
{
|
||||
options.FallbackPolicy = new AuthorizationPolicyBuilder().RequireAuthenticatedUser().Build();
|
||||
options.AddPolicy("Internal", new AuthorizationPolicyBuilder().RequireClaim(MareClaimTypes.Internal, "true").Build());
|
||||
});
|
||||
services.AddSingleton<IUserIdProvider, IdBasedUserIdProvider>();
|
||||
|
||||
services.AddHealthChecks();
|
||||
services.AddHttpLogging(e => e = new Microsoft.AspNetCore.HttpLogging.HttpLoggingOptions());
|
||||
@@ -225,8 +253,12 @@ public class Startup
|
||||
app.UseAuthorization();
|
||||
|
||||
app.UseEndpoints(e =>
|
||||
{
|
||||
if (_isMain)
|
||||
{
|
||||
e.MapHub<MareSynchronosServer.Hubs.MareHub>("/dummyhub");
|
||||
}
|
||||
|
||||
e.MapControllers();
|
||||
e.MapHealthChecks("/health").WithMetadata(new AllowAnonymousAttribute());
|
||||
});
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
namespace MareSynchronosStaticFilesServer.Utils;
|
||||
using System.Text.RegularExpressions;
|
||||
|
||||
public static class FilePathUtil
|
||||
namespace MareSynchronosStaticFilesServer.Utils;
|
||||
|
||||
public static partial class FilePathUtil
|
||||
{
|
||||
public static FileInfo GetFileInfoForHash(string basePath, string hash)
|
||||
{
|
||||
FileInfo fi = new(Path.Combine(basePath, hash[0].ToString(), hash));
|
||||
if (hash.Length != 40 || !hash.All(char.IsAsciiLetterOrDigit)) throw new InvalidOperationException();
|
||||
|
||||
FileInfo fi = new(Path.Join(basePath, hash[0].ToString(), hash));
|
||||
if (!fi.Exists)
|
||||
{
|
||||
fi = new FileInfo(Path.Combine(basePath, hash));
|
||||
fi = new FileInfo(Path.Join(basePath, hash));
|
||||
if (!fi.Exists)
|
||||
{
|
||||
return null;
|
||||
@@ -19,8 +23,10 @@ public static class FilePathUtil
|
||||
|
||||
public static string GetFilePath(string basePath, string hash)
|
||||
{
|
||||
var dirPath = Path.Combine(basePath, hash[0].ToString());
|
||||
var path = Path.Combine(dirPath, hash);
|
||||
if (hash.Length != 40 || !hash.All(char.IsAsciiLetterOrDigit)) throw new InvalidOperationException();
|
||||
|
||||
var dirPath = Path.Join(basePath, hash[0].ToString());
|
||||
var path = Path.Join(dirPath, hash);
|
||||
if (!Directory.Exists(dirPath)) Directory.CreateDirectory(dirPath);
|
||||
return path;
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ public class RequestFileStreamResult : FileStreamResult
|
||||
_requestQueueService.FinishRequest(_requestId);
|
||||
|
||||
_mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads);
|
||||
FileStream?.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,8 +51,8 @@ public class RequestFileStreamResult : FileStreamResult
|
||||
finally
|
||||
{
|
||||
_requestQueueService.FinishRequest(_requestId);
|
||||
|
||||
_mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads);
|
||||
FileStream?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,6 @@
|
||||
namespace MareSynchronosStaticFilesServer.Utils;
|
||||
|
||||
public record UserRequest(Guid RequestId, string User, List<string> FileIds);
|
||||
public record UserRequest(Guid RequestId, string User, List<string> FileIds)
|
||||
{
|
||||
public bool IsCancelled { get; set; } = false;
|
||||
}
|
||||
Reference in New Issue
Block a user