From aa177583e055a6dafaaee53c6ad18df526241652 Mon Sep 17 00:00:00 2001 From: rootdarkarchon Date: Sat, 14 Jan 2023 12:57:34 +0100 Subject: [PATCH] sanitize queue processing --- .../Services/RequestQueueService.cs | 73 ++++++++++++------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs index c18c665..06c40f5 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs @@ -35,22 +35,34 @@ public class RequestQueueService : IHostedService return QueueStatus.Waiting; } - await _queueSemaphore.WaitAsync().ConfigureAwait(false); - QueueStatus status = QueueStatus.Waiting; - var idx = Array.FindIndex(_userQueueRequests, r => r == null); - if (idx == -1) + try { - _queue.Enqueue(request); - status = QueueStatus.Waiting; - } - else - { - DequeueIntoSlot(request, idx); - status = QueueStatus.Ready; - } - _queueSemaphore.Release(); + await _queueSemaphore.WaitAsync().ConfigureAwait(false); + QueueStatus status = QueueStatus.Waiting; + var idx = Array.FindIndex(_userQueueRequests, r => r == null); + if (idx == -1) + { + _queue.Enqueue(request); + status = QueueStatus.Waiting; + } + else + { + DequeueIntoSlot(request, idx); + status = QueueStatus.Ready; + } - return status; + return status; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during EnqueueUser"); + } + finally + { + _queueSemaphore.Release(); + } + + throw new Exception("Error during EnqueueUser"); } public bool StillEnqueued(Guid request, string user, out int queuePosition) @@ -92,24 +104,35 @@ public class RequestQueueService : IHostedService { while (!ct.IsCancellationRequested) { - await _queueProcessingSemaphore.WaitAsync(ct).ConfigureAwait(false); - await _queueSemaphore.WaitAsync(ct).ConfigureAwait(false); - for (int i = 0; i < _userQueueRequests.Length; i++) + try { - if (_userQueueRequests[i] != null && !_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow) _userQueueRequests[i] = null; - - if (_userQueueRequests[i] == null) + await _queueProcessingSemaphore.WaitAsync(ct).ConfigureAwait(false); + await _queueSemaphore.WaitAsync(ct).ConfigureAwait(false); + for (int i = 0; i < _userQueueRequests.Length; i++) { - if (_queue.TryDequeue(out var request)) + if (_userQueueRequests[i] != null && !_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow) _userQueueRequests[i] = null; + + if (_userQueueRequests[i] == null) { - DequeueIntoSlot(request, i); + if (_queue.TryDequeue(out var request)) + { + DequeueIntoSlot(request, i); + } } + + if (!_queue.Any()) break; } - if (!_queue.Any()) break; } - _queueProcessingSemaphore.Release(); - _queueSemaphore.Release(); + catch (Exception ex) + { + _logger.LogError(ex, "Error during Queue processing"); + } + finally + { + _queueProcessingSemaphore.Release(); + _queueSemaphore.Release(); + } _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count);