refactor file server

This commit is contained in:
rootdarkarchon
2024-01-15 12:26:09 +01:00
committed by Loporrit
parent d40730bca3
commit d54d793575
13 changed files with 443 additions and 179 deletions

View File

@@ -21,7 +21,8 @@ public sealed class CachedFileProvider : IDisposable
private readonly SemaphoreSlim _downloadSemaphore = new(1);
private bool _disposed;
private bool IsMainServer => _remoteCacheSourceUri == null;
private bool IsMainServer => _remoteCacheSourceUri == null && _isDistributionServer;
private bool _isDistributionServer;
public CachedFileProvider(IConfigurationService<StaticFilesServerConfiguration> configuration, ILogger<CachedFileProvider> logger, FileStatisticsService fileStatisticsService, MareMetrics metrics, ServerTokenGenerator generator)
{
@@ -30,8 +31,10 @@ public sealed class CachedFileProvider : IDisposable
_metrics = metrics;
_generator = generator;
_remoteCacheSourceUri = configuration.GetValueOrDefault<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress), null);
_isDistributionServer = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.IsDistributionNode), false);
_basePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
_httpClient = new();
_httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("MareSynchronosServer"));
}
public void Dispose()
@@ -48,7 +51,7 @@ public sealed class CachedFileProvider : IDisposable
private async Task DownloadTask(string hash)
{
// download file from remote
var downloadUrl = MareFiles.ServerFilesGetFullPath(_remoteCacheSourceUri, hash);
var downloadUrl = MareFiles.DistributionGetFullPath(_remoteCacheSourceUri, hash);
_logger.LogInformation("Did not find {hash}, downloading from {server}", hash, downloadUrl);
using var requestMessage = new HttpRequestMessage(HttpMethod.Get, downloadUrl);
@@ -129,12 +132,4 @@ public sealed class CachedFileProvider : IDisposable
return GetLocalFileStream(hash);
}
private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
}
}

View File

@@ -0,0 +1,6 @@
namespace MareSynchronosStaticFilesServer.Services;
public interface IClientReadyMessageService
{
void SendDownloadReady(string uid, Guid requestId);
}

View File

@@ -0,0 +1,26 @@
using Microsoft.AspNetCore.SignalR;
using MareSynchronos.API.SignalR;
using MareSynchronosServer.Hubs;
namespace MareSynchronosStaticFilesServer.Services;
public class MainClientReadyMessageService : IClientReadyMessageService
{
private readonly ILogger<MainClientReadyMessageService> _logger;
private readonly IHubContext<MareHub> _mareHub;
public MainClientReadyMessageService(ILogger<MainClientReadyMessageService> logger, IHubContext<MareHub> mareHub)
{
_logger = logger;
_mareHub = mareHub;
}
public void SendDownloadReady(string uid, Guid requestId)
{
_ = Task.Run(async () =>
{
_logger.LogInformation("Sending Client Ready for {uid}:{requestId} to SignalR", uid, requestId);
await _mareHub.Clients.User(uid).SendAsync(nameof(IMareHub.Client_DownloadReady), requestId).ConfigureAwait(false);
});
}
}

View File

@@ -8,23 +8,21 @@ using Microsoft.EntityFrameworkCore;
namespace MareSynchronosStaticFilesServer.Services;
public class FileCleanupService : IHostedService
public class MainFileCleanupService : IHostedService
{
private readonly string _cacheDir;
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
private readonly bool _isMainServer;
private readonly ILogger<FileCleanupService> _logger;
private readonly ILogger<MainFileCleanupService> _logger;
private readonly MareMetrics _metrics;
private readonly IServiceProvider _services;
private CancellationTokenSource _cleanupCts;
public FileCleanupService(MareMetrics metrics, ILogger<FileCleanupService> logger, IServiceProvider services, IConfigurationService<StaticFilesServerConfiguration> configuration)
public MainFileCleanupService(MareMetrics metrics, ILogger<MainFileCleanupService> logger, IServiceProvider services, IConfigurationService<StaticFilesServerConfiguration> configuration)
{
_metrics = metrics;
_logger = logger;
_services = services;
_configuration = configuration;
_isMainServer = configuration.IsMain;
_cacheDir = _configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
}
@@ -48,12 +46,9 @@ public class FileCleanupService : IHostedService
CleanUpFilesBeyondSizeLimit(dbContext, ct);
if (_isMainServer)
{
CleanUpStuckUploads(dbContext);
CleanUpStuckUploads(dbContext);
await dbContext.SaveChangesAsync(ct).ConfigureAwait(false);
}
await dbContext.SaveChangesAsync(ct).ConfigureAwait(false);
}
catch (Exception e)
{
@@ -113,11 +108,8 @@ public class FileCleanupService : IHostedService
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("Deleting {oldestFile} with size {size}MiB", oldestFile.FullName, ByteSize.FromBytes(oldestFile.Length).MebiBytes);
oldestFile.Delete();
if (_isMainServer)
{
FileCache f = new() { Hash = oldestFile.Name.ToUpperInvariant() };
dbContext.Entry(f).State = EntityState.Deleted;
}
FileCache f = new() { Hash = oldestFile.Name.ToUpperInvariant() };
dbContext.Entry(f).State = EntityState.Deleted;
}
}
catch (Exception ex)
@@ -128,21 +120,18 @@ public class FileCleanupService : IHostedService
private void CleanUpOrphanedFiles(List<FileCache> allFiles, FileInfo[] allPhysicalFiles, CancellationToken ct)
{
if (_isMainServer)
var allFilesHashes = new HashSet<string>(allFiles.Select(a => a.Hash.ToUpperInvariant()), StringComparer.Ordinal);
foreach (var file in allPhysicalFiles)
{
var allFilesHashes = new HashSet<string>(allFiles.Select(a => a.Hash.ToUpperInvariant()), StringComparer.Ordinal);
foreach (var file in allPhysicalFiles)
if (!allFilesHashes.Contains(file.Name.ToUpperInvariant()))
{
if (!allFilesHashes.Contains(file.Name.ToUpperInvariant()))
{
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
file.Delete();
_logger.LogInformation("File not in DB, deleting: {fileName}", file.Name);
}
ct.ThrowIfCancellationRequested();
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
file.Delete();
_logger.LogInformation("File not in DB, deleting: {fileName}", file.Name);
}
ct.ThrowIfCancellationRequested();
}
}
@@ -164,13 +153,15 @@ public class FileCleanupService : IHostedService
var prevTimeForcedDeletion = DateTime.Now.Subtract(TimeSpan.FromHours(forcedDeletionAfterHours));
DirectoryInfo dir = new(_cacheDir);
var allFilesInDir = dir.GetFiles("*", SearchOption.AllDirectories);
var allFiles = await dbContext.Files.ToListAsync().ConfigureAwait(false);
var files = dbContext.Files.OrderBy(f => f.Hash);
List<FileCache> allFiles = await dbContext.Files.ToListAsync(ct).ConfigureAwait(false);
int fileCounter = 0;
foreach (var fileCache in allFiles.Where(f => f.Uploaded))
{
var file = FilePathUtil.GetFileInfoForHash(_cacheDir, fileCache.Hash);
bool fileDeleted = false;
if (file == null && _isMainServer)
var file = FilePathUtil.GetFileInfoForHash(_cacheDir, fileCache.Hash);
if (file == null)
{
_logger.LogInformation("File does not exist anymore: {fileName}", fileCache.Hash);
dbContext.Files.Remove(fileCache);
@@ -182,11 +173,8 @@ public class FileCleanupService : IHostedService
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("File outdated: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
file.Delete();
if (_isMainServer)
{
fileDeleted = true;
dbContext.Files.Remove(fileCache);
}
fileDeleted = true;
dbContext.Files.Remove(fileCache);
}
else if (file != null && forcedDeletionAfterHours > 0 && file.LastWriteTime < prevTimeForcedDeletion)
{
@@ -194,14 +182,11 @@ public class FileCleanupService : IHostedService
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("File forcefully deleted: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
file.Delete();
if (_isMainServer)
{
fileDeleted = true;
dbContext.Files.Remove(fileCache);
}
fileDeleted = true;
dbContext.Files.Remove(fileCache);
}
if (_isMainServer && !fileDeleted && file != null && fileCache.Size == 0)
if (!fileDeleted && file != null && fileCache.Size == 0)
{
_logger.LogInformation("Setting File Size of " + fileCache.Hash + " to " + file.Length);
fileCache.Size = file.Length;

View File

@@ -1,32 +1,27 @@
using MareSynchronosShared.Metrics;
using MareSynchronosShared.Services;
using MareSynchronosStaticFilesServer.Utils;
using Microsoft.AspNetCore.SignalR;
using System.Collections.Concurrent;
using System.Timers;
using MareSynchronos.API.SignalR;
namespace MareSynchronosStaticFilesServer.Services;
public class RequestQueueService : IHostedService
{
private record PriorityEntry(bool IsHighPriority, DateTime LastChecked);
private readonly IHubContext<MareSynchronosServer.Hubs.MareHub> _hubContext;
private readonly IClientReadyMessageService _clientReadyMessageService;
private readonly ILogger<RequestQueueService> _logger;
private readonly MareMetrics _metrics;
private readonly ConcurrentQueue<UserRequest> _queue = new();
private readonly ConcurrentQueue<UserRequest> _priorityQueue = new();
private readonly int _queueExpirationSeconds;
private readonly SemaphoreSlim _queueProcessingSemaphore = new(1);
private readonly SemaphoreSlim _queueSemaphore = new(1);
private readonly UserQueueEntry[] _userQueueRequests;
private int _queueLimitForReset;
private readonly int _queueReleaseSeconds;
private System.Timers.Timer _queueTimer;
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService,
ILogger<RequestQueueService> logger, IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext)
ILogger<RequestQueueService> logger, IClientReadyMessageService hubContext)
{
_userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)];
_queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5);
@@ -34,7 +29,7 @@ public class RequestQueueService : IHostedService
_queueReleaseSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueReleaseSeconds), 15);
_metrics = metrics;
_logger = logger;
_hubContext = hubContext;
_clientReadyMessageService = hubContext;
}
public void ActivateRequest(Guid request)
@@ -125,7 +120,7 @@ public class RequestQueueService : IHostedService
{
_logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, string.Join(", ", userRequest.FileIds));
_userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds));
_ = _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false);
_clientReadyMessageService.SendDownloadReady(userRequest.User, userRequest.RequestId);
}
private void ProcessQueue(object src, ElapsedEventArgs e)

View File

@@ -0,0 +1,47 @@
using MareSynchronos.API.Routes;
using MareSynchronosShared.Services;
using MareSynchronosShared.Utils;
using System.Net.Http.Headers;
namespace MareSynchronosStaticFilesServer.Services;
public class ShardClientReadyMessageService : IClientReadyMessageService
{
private readonly ILogger<ShardClientReadyMessageService> _logger;
private readonly ServerTokenGenerator _tokenGenerator;
private readonly IConfigurationService<StaticFilesServerConfiguration> _configurationService;
private readonly HttpClient _httpClient;
public ShardClientReadyMessageService(ILogger<ShardClientReadyMessageService> logger, ServerTokenGenerator tokenGenerator, IConfigurationService<StaticFilesServerConfiguration> configurationService)
{
_logger = logger;
_tokenGenerator = tokenGenerator;
_configurationService = configurationService;
_httpClient = new();
_httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("MareSynchronosServer"));
}
public void SendDownloadReady(string uid, Guid requestId)
{
var mainUrl = _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress));
var path = MareFiles.MainSendReadyFullPath(mainUrl, uid, requestId);
using HttpRequestMessage msg = new()
{
RequestUri = path
};
msg.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _tokenGenerator.Token);
_ = Task.Run(async () =>
{
_logger.LogInformation("Sending Client Ready for {uid}:{requestId} to {path}", uid, requestId, path);
try
{
using var result = await _httpClient.SendAsync(msg).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failure to send for {uid}:{requestId}", uid, requestId);
}
});
}
}

View File

@@ -0,0 +1,153 @@
using ByteSizeLib;
using MareSynchronosShared.Metrics;
using MareSynchronosShared.Services;
using Microsoft.EntityFrameworkCore;
namespace MareSynchronosStaticFilesServer.Services;
public class ShardFileCleanupService : IHostedService
{
private readonly string _cacheDir;
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
private readonly ILogger<MainFileCleanupService> _logger;
private readonly MareMetrics _metrics;
private CancellationTokenSource _cleanupCts;
public ShardFileCleanupService(MareMetrics metrics, ILogger<MainFileCleanupService> logger, IConfigurationService<StaticFilesServerConfiguration> configuration)
{
_metrics = metrics;
_logger = logger;
_configuration = configuration;
_cacheDir = _configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
}
public async Task CleanUpTask(CancellationToken ct)
{
_logger.LogInformation("Starting periodic cleanup task");
while (!ct.IsCancellationRequested)
{
try
{
DirectoryInfo dir = new(_cacheDir);
var allFiles = dir.GetFiles("*", SearchOption.AllDirectories);
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotalSize, allFiles.Sum(f => f.Length));
_metrics.SetGaugeTo(MetricsAPI.GaugeFilesTotal, allFiles.Length);
CleanUpOutdatedFiles(ct);
CleanUpFilesBeyondSizeLimit(ct);
}
catch (Exception e)
{
_logger.LogError(e, "Error during cleanup task");
}
var now = DateTime.Now;
TimeOnly currentTime = new(now.Hour, now.Minute, now.Second);
TimeOnly futureTime = new(now.Hour, now.Minute - now.Minute % 15, 0);
var span = futureTime.AddMinutes(15) - currentTime;
_logger.LogInformation("File Cleanup Complete, next run at {date}", now.Add(span));
await Task.Delay(span, ct).ConfigureAwait(false);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Cleanup Service started");
_cleanupCts = new();
_ = CleanUpTask(_cleanupCts.Token);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cleanupCts.Cancel();
return Task.CompletedTask;
}
private void CleanUpFilesBeyondSizeLimit(CancellationToken ct)
{
var sizeLimit = _configuration.GetValueOrDefault<double>(nameof(StaticFilesServerConfiguration.CacheSizeHardLimitInGiB), -1);
if (sizeLimit <= 0)
{
return;
}
try
{
_logger.LogInformation("Cleaning up files beyond the cache size limit of {cacheSizeLimit} GiB", sizeLimit);
var allLocalFiles = Directory.EnumerateFiles(_cacheDir, "*", SearchOption.AllDirectories)
.Select(f => new FileInfo(f)).ToList()
.OrderBy(f => f.LastAccessTimeUtc).ToList();
var totalCacheSizeInBytes = allLocalFiles.Sum(s => s.Length);
long cacheSizeLimitInBytes = (long)ByteSize.FromGibiBytes(sizeLimit).Bytes;
while (totalCacheSizeInBytes > cacheSizeLimitInBytes && allLocalFiles.Any() && !ct.IsCancellationRequested)
{
var oldestFile = allLocalFiles[0];
allLocalFiles.Remove(oldestFile);
totalCacheSizeInBytes -= oldestFile.Length;
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, oldestFile.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("Deleting {oldestFile} with size {size}MiB", oldestFile.FullName, ByteSize.FromBytes(oldestFile.Length).MebiBytes);
oldestFile.Delete();
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error during cache size limit cleanup");
}
}
private void CleanUpOutdatedFiles(CancellationToken ct)
{
try
{
var unusedRetention = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UnusedFileRetentionPeriodInDays), 14);
var forcedDeletionAfterHours = _configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.ForcedDeletionOfFilesAfterHours), -1);
_logger.LogInformation("Cleaning up files older than {filesOlderThanDays} days", unusedRetention);
if (forcedDeletionAfterHours > 0)
{
_logger.LogInformation("Cleaning up files written to longer than {hours}h ago", forcedDeletionAfterHours);
}
var prevTime = DateTime.Now.Subtract(TimeSpan.FromDays(unusedRetention));
var prevTimeForcedDeletion = DateTime.Now.Subtract(TimeSpan.FromHours(forcedDeletionAfterHours));
DirectoryInfo dir = new(_cacheDir);
var allFilesInDir = dir.GetFiles("*", SearchOption.AllDirectories);
int fileCounter = 0;
foreach (var file in allFilesInDir)
{
if (file.LastAccessTime < prevTime)
{
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("File outdated: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
file.Delete();
}
else if (forcedDeletionAfterHours > 0 && file.LastWriteTime < prevTimeForcedDeletion)
{
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
_logger.LogInformation("File forcefully deleted: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
file.Delete();
}
fileCounter++;
ct.ThrowIfCancellationRequested();
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error during file cleanup of old files");
}
}
}