From 701a5763b5ea45050d7fe326ac795a3aa5c2b6d6 Mon Sep 17 00:00:00 2001 From: rootdarkarchon Date: Sun, 17 Sep 2023 03:21:10 +0200 Subject: [PATCH] add high priority queue --- .../Controllers/RequestController.cs | 2 +- .../Services/RequestQueueService.cs | 53 +++++++++++++++++-- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs index 7d4609d..ee52f91 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs @@ -65,7 +65,7 @@ public class RequestController : ControllerBase { try { - if (!_requestQueue.StillEnqueued(requestId, MareUser)) + if (!await _requestQueue.StillEnqueued(requestId, MareUser)) await _requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList())); return Ok(); } diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs index 8df7a0a..514e024 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs @@ -5,25 +5,34 @@ using Microsoft.AspNetCore.SignalR; using System.Collections.Concurrent; using System.Timers; using MareSynchronos.API.SignalR; +using MareSynchronosShared.Data; +using Microsoft.EntityFrameworkCore; +using System.Linq; namespace MareSynchronosStaticFilesServer.Services; public class RequestQueueService : IHostedService { + private record PriorityEntry(bool IsHighPriority, DateTime LastChecked); + private readonly IHubContext _hubContext; + private readonly MareDbContext _mareDbContext; private readonly ILogger _logger; private readonly MareMetrics _metrics; private readonly ConcurrentQueue _queue = new(); + private readonly ConcurrentQueue _priorityQueue = new(); private readonly int _queueExpirationSeconds; private readonly SemaphoreSlim _queueProcessingSemaphore = new(1); private readonly ConcurrentDictionary _queueRemoval = new(); private readonly SemaphoreSlim _queueSemaphore = new(1); private readonly UserQueueEntry[] _userQueueRequests; + private readonly ConcurrentDictionary _priorityCache = new(StringComparer.Ordinal); private int _queueLimitForReset; private readonly int _queueReleaseSeconds; private System.Timers.Timer _queueTimer; - public RequestQueueService(MareMetrics metrics, IConfigurationService configurationService, ILogger logger, IHubContext hubContext) + public RequestQueueService(MareMetrics metrics, IConfigurationService configurationService, + ILogger logger, IHubContext hubContext, MareDbContext mareDbContext) { _userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)]; _queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5); @@ -32,6 +41,7 @@ public class RequestQueueService : IHostedService _metrics = metrics; _logger = logger; _hubContext = hubContext; + _mareDbContext = mareDbContext; } public void ActivateRequest(Guid request) @@ -41,20 +51,36 @@ public class RequestQueueService : IHostedService req.MarkActive(); } + private async Task IsHighPriority(string uid) + { + if (!_priorityCache.TryGetValue(uid, out PriorityEntry entry) || entry.LastChecked.Add(TimeSpan.FromHours(6)) < DateTime.UtcNow) + { + var user = await _mareDbContext.Users.FirstOrDefaultAsync(u => u.UID == uid).ConfigureAwait(false); + entry = new(user != null && !string.IsNullOrEmpty(user.Alias), DateTime.UtcNow); + _priorityCache[uid] = entry; + } + + return entry.IsHighPriority; + } + public async Task EnqueueUser(UserRequest request) { _logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds)); + bool isPriorityQueue = await IsHighPriority(request.User).ConfigureAwait(false); + if (_queueProcessingSemaphore.CurrentCount == 0) { - _queue.Enqueue(request); + if (isPriorityQueue) _priorityQueue.Enqueue(request); + else _queue.Enqueue(request); return; } try { await _queueSemaphore.WaitAsync().ConfigureAwait(false); - _queue.Enqueue(request); + if (isPriorityQueue) _priorityQueue.Enqueue(request); + else _queue.Enqueue(request); return; } @@ -120,8 +146,13 @@ public class RequestQueueService : IHostedService return Task.CompletedTask; } - public bool StillEnqueued(Guid request, string user) + public async Task StillEnqueued(Guid request, string user) { + bool isPriorityQueue = await IsHighPriority(user).ConfigureAwait(false); + if (isPriorityQueue) + { + return _priorityQueue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal)); + } return _queue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal)); } @@ -179,6 +210,20 @@ public class RequestQueueService : IHostedService bool enqueued = false; while (!enqueued) { + if (_priorityQueue.TryDequeue(out var prioRequest)) + { + if (_queueRemoval.TryGetValue(prioRequest.RequestId, out string user) && string.Equals(user, prioRequest.User, StringComparison.Ordinal)) + { + _logger.LogDebug("Request cancelled: {requestId} by {user}", prioRequest.RequestId, user); + _queueRemoval.Remove(prioRequest.RequestId, out _); + continue; + } + + await DequeueIntoSlotAsync(prioRequest, i).ConfigureAwait(false); + enqueued = true; + break; + } + if (_queue.TryDequeue(out var request)) { if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal))