diff --git a/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs b/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs index 3ca9592..0503c10 100644 --- a/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs +++ b/MareSynchronosServer/MareSynchronosShared/Metrics/MetricsAPI.cs @@ -31,6 +31,9 @@ public class MetricsAPI public const string GaugeQueueActive = "mare_download_queue_active"; public const string GaugeQueueInactive = "mare_download_queue_inactive"; public const string GaugeDownloadQueue = "mare_download_queue"; + public const string GaugeDownloadQueueCancelled = "mare_download_queue_cancelled"; + public const string GaugeDownloadPriorityQueue = "mare_download_priority_queue"; + public const string GaugeDownloadPriorityQueueCancelled = "mare_download_priority_queue_cancelled"; public const string CounterFileRequests = "mare_files_requests"; public const string CounterFileRequestSize = "mare_files_request_size"; public const string CounterAccountsCreated = "mare_accounts_created"; diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs index 0fab291..833d4a6 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs @@ -9,7 +9,6 @@ public class RequestController : ControllerBase { private readonly CachedFileProvider _cachedFileProvider; private readonly RequestQueueService _requestQueue; - private static readonly SemaphoreSlim _parallelRequestSemaphore = new(500); public RequestController(ILogger logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue) : base(logger) { @@ -23,15 +22,10 @@ public class RequestController : ControllerBase { try { - await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted); _requestQueue.RemoveFromQueue(requestId, MareUser, IsPriority); return Ok(); } catch (OperationCanceledException) { return BadRequest(); } - finally - { - _parallelRequestSemaphore.Release(); - } } [HttpPost] @@ -40,7 +34,6 @@ public class RequestController : ControllerBase { try { - await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted); foreach (var file in files) { _logger.LogDebug("Prerequested file: " + file); @@ -48,15 +41,11 @@ public class RequestController : ControllerBase } Guid g = Guid.NewGuid(); - _requestQueue.EnqueueUser(new(g, MareUser, files.ToList()), IsPriority); + await _requestQueue.EnqueueUser(new(g, MareUser, files.ToList()), IsPriority, HttpContext.RequestAborted); return Ok(g); } catch (OperationCanceledException) { return BadRequest(); } - finally - { - _parallelRequestSemaphore.Release(); - } } [HttpGet] @@ -66,7 +55,7 @@ public class RequestController : ControllerBase try { if (!_requestQueue.StillEnqueued(requestId, MareUser, IsPriority)) - _requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList()), IsPriority); + await _requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList()), IsPriority, HttpContext.RequestAborted); return Ok(); } catch (OperationCanceledException) { return BadRequest(); } diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs index b1a2ad6..690a3de 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs @@ -19,7 +19,6 @@ public class RequestQueueService : IHostedService private readonly ConcurrentQueue _priorityQueue = new(); private readonly int _queueExpirationSeconds; private readonly SemaphoreSlim _queueProcessingSemaphore = new(1); - private readonly ConcurrentDictionary _queueRemoval = new(); private readonly SemaphoreSlim _queueSemaphore = new(1); private readonly UserQueueEntry[] _userQueueRequests; private int _queueLimitForReset; @@ -45,8 +44,13 @@ public class RequestQueueService : IHostedService req.MarkActive(); } - public void EnqueueUser(UserRequest request, bool isPriority) + public async Task EnqueueUser(UserRequest request, bool isPriority, CancellationToken token) { + while (_queueProcessingSemaphore.CurrentCount == 0) + { + await Task.Delay(50, token).ConfigureAwait(false); + } + _logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds)); GetQueue(isPriority).Enqueue(request); @@ -88,10 +92,11 @@ public class RequestQueueService : IHostedService _userQueueRequests[idx] = null; } } - - return; } - _queueRemoval[requestId] = user; + else + { + existingRequest.IsCancelled = true; + } } public Task StartAsync(CancellationToken cancellationToken) @@ -116,22 +121,22 @@ public class RequestQueueService : IHostedService return Task.CompletedTask; } - private async Task DequeueIntoSlotAsync(UserRequest userRequest, int slot) + private void DequeueIntoSlot(UserRequest userRequest, int slot) { _logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, string.Join(", ", userRequest.FileIds)); _userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds)); - await _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false); + _ = _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false); } - private async void ProcessQueue(object src, ElapsedEventArgs e) + private void ProcessQueue(object src, ElapsedEventArgs e) { if (_queueProcessingSemaphore.CurrentCount == 0) return; - await _queueProcessingSemaphore.WaitAsync().ConfigureAwait(false); + _queueProcessingSemaphore.Wait(); try { - if (_queue.Count > _queueLimitForReset) + if (_queue.Count(c => !c.IsCancelled) > _queueLimitForReset) { _queue.Clear(); return; @@ -141,56 +146,36 @@ public class RequestQueueService : IHostedService { try { - if (_userQueueRequests[i] != null && ((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow))) + if (_userQueueRequests[i] != null + && (((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow)) + || (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds)))) + ) { - _logger.LogDebug("Expiring inactive request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i); + _logger.LogDebug("Expiring request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i); _userQueueRequests[i] = null; } - if (_userQueueRequests[i] != null && (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds)))) - { - _logger.LogDebug("Expiring active request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i); - _userQueueRequests[i] = null; - } + if (_userQueueRequests[i] != null) continue; - if (!_queue.Any()) return; - - if (_userQueueRequests[i] == null) + while (true) { - bool enqueued = false; - while (!enqueued) + if (_priorityQueue.TryDequeue(out var prioRequest)) { - if (_priorityQueue.TryDequeue(out var prioRequest)) - { - if (_queueRemoval.TryGetValue(prioRequest.RequestId, out string user) && string.Equals(user, prioRequest.User, StringComparison.Ordinal)) - { - _logger.LogDebug("Request cancelled: {requestId} by {user}", prioRequest.RequestId, user); - _queueRemoval.Remove(prioRequest.RequestId, out _); - continue; - } + if (prioRequest.IsCancelled) continue; - await DequeueIntoSlotAsync(prioRequest, i).ConfigureAwait(false); - enqueued = true; - break; - } - - if (_queue.TryDequeue(out var request)) - { - if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal)) - { - _logger.LogDebug("Request cancelled: {requestId} by {user}", request.RequestId, user); - _queueRemoval.Remove(request.RequestId, out _); - continue; - } - - await DequeueIntoSlotAsync(request, i).ConfigureAwait(false); - enqueued = true; - } - else - { - enqueued = true; - } + DequeueIntoSlot(prioRequest, i); + break; } + + if (_queue.TryDequeue(out var request)) + { + if (request.IsCancelled) continue; + + DequeueIntoSlot(request, i); + break; + } + + break; } } catch (Exception ex) @@ -211,6 +196,9 @@ public class RequestQueueService : IHostedService _metrics.SetGaugeTo(MetricsAPI.GaugeQueueFree, _userQueueRequests.Count(c => c == null)); _metrics.SetGaugeTo(MetricsAPI.GaugeQueueActive, _userQueueRequests.Count(c => c != null && c.IsActive)); _metrics.SetGaugeTo(MetricsAPI.GaugeQueueInactive, _userQueueRequests.Count(c => c != null && !c.IsActive)); - _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count); + _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count(q => !q.IsCancelled)); + _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueueCancelled, _queue.Count(q => q.IsCancelled)); + _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueue, _priorityQueue.Count(q => !q.IsCancelled)); + _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueueCancelled, _priorityQueue.Count(q => q.IsCancelled)); } } \ No newline at end of file diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs index 089ee24..f13d44e 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs @@ -65,6 +65,9 @@ public class Startup MetricsAPI.GaugeFilesUniquePastHourSize, MetricsAPI.GaugeCurrentDownloads, MetricsAPI.GaugeDownloadQueue, + MetricsAPI.GaugeDownloadQueueCancelled, + MetricsAPI.GaugeDownloadPriorityQueue, + MetricsAPI.GaugeDownloadPriorityQueueCancelled, MetricsAPI.GaugeQueueFree, MetricsAPI.GaugeQueueInactive, MetricsAPI.GaugeQueueActive, diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/UserRequest.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/UserRequest.cs index 4640c09..f64def1 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/UserRequest.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/UserRequest.cs @@ -1,3 +1,6 @@ namespace MareSynchronosStaticFilesServer.Utils; -public record UserRequest(Guid RequestId, string User, List FileIds); +public record UserRequest(Guid RequestId, string User, List FileIds) +{ + public bool IsCancelled { get; set; } = false; +} \ No newline at end of file