add high priority queue

This commit is contained in:
rootdarkarchon
2023-09-17 03:21:10 +02:00
committed by Loporrit
parent 3b77a179c5
commit 701a5763b5
2 changed files with 50 additions and 5 deletions

View File

@@ -65,7 +65,7 @@ public class RequestController : ControllerBase
{ {
try try
{ {
if (!_requestQueue.StillEnqueued(requestId, MareUser)) if (!await _requestQueue.StillEnqueued(requestId, MareUser))
await _requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList())); await _requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList()));
return Ok(); return Ok();
} }

View File

@@ -5,25 +5,34 @@ using Microsoft.AspNetCore.SignalR;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Timers; using System.Timers;
using MareSynchronos.API.SignalR; using MareSynchronos.API.SignalR;
using MareSynchronosShared.Data;
using Microsoft.EntityFrameworkCore;
using System.Linq;
namespace MareSynchronosStaticFilesServer.Services; namespace MareSynchronosStaticFilesServer.Services;
public class RequestQueueService : IHostedService public class RequestQueueService : IHostedService
{ {
private record PriorityEntry(bool IsHighPriority, DateTime LastChecked);
private readonly IHubContext<MareSynchronosServer.Hubs.MareHub> _hubContext; private readonly IHubContext<MareSynchronosServer.Hubs.MareHub> _hubContext;
private readonly MareDbContext _mareDbContext;
private readonly ILogger<RequestQueueService> _logger; private readonly ILogger<RequestQueueService> _logger;
private readonly MareMetrics _metrics; private readonly MareMetrics _metrics;
private readonly ConcurrentQueue<UserRequest> _queue = new(); private readonly ConcurrentQueue<UserRequest> _queue = new();
private readonly ConcurrentQueue<UserRequest> _priorityQueue = new();
private readonly int _queueExpirationSeconds; private readonly int _queueExpirationSeconds;
private readonly SemaphoreSlim _queueProcessingSemaphore = new(1); private readonly SemaphoreSlim _queueProcessingSemaphore = new(1);
private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new(); private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new();
private readonly SemaphoreSlim _queueSemaphore = new(1); private readonly SemaphoreSlim _queueSemaphore = new(1);
private readonly UserQueueEntry[] _userQueueRequests; private readonly UserQueueEntry[] _userQueueRequests;
private readonly ConcurrentDictionary<string, PriorityEntry> _priorityCache = new(StringComparer.Ordinal);
private int _queueLimitForReset; private int _queueLimitForReset;
private readonly int _queueReleaseSeconds; private readonly int _queueReleaseSeconds;
private System.Timers.Timer _queueTimer; private System.Timers.Timer _queueTimer;
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger, IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext) public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService,
ILogger<RequestQueueService> logger, IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext, MareDbContext mareDbContext)
{ {
_userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)]; _userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)];
_queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5); _queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5);
@@ -32,6 +41,7 @@ public class RequestQueueService : IHostedService
_metrics = metrics; _metrics = metrics;
_logger = logger; _logger = logger;
_hubContext = hubContext; _hubContext = hubContext;
_mareDbContext = mareDbContext;
} }
public void ActivateRequest(Guid request) public void ActivateRequest(Guid request)
@@ -41,20 +51,36 @@ public class RequestQueueService : IHostedService
req.MarkActive(); req.MarkActive();
} }
private async Task<bool> IsHighPriority(string uid)
{
if (!_priorityCache.TryGetValue(uid, out PriorityEntry entry) || entry.LastChecked.Add(TimeSpan.FromHours(6)) < DateTime.UtcNow)
{
var user = await _mareDbContext.Users.FirstOrDefaultAsync(u => u.UID == uid).ConfigureAwait(false);
entry = new(user != null && !string.IsNullOrEmpty(user.Alias), DateTime.UtcNow);
_priorityCache[uid] = entry;
}
return entry.IsHighPriority;
}
public async Task EnqueueUser(UserRequest request) public async Task EnqueueUser(UserRequest request)
{ {
_logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds)); _logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds));
bool isPriorityQueue = await IsHighPriority(request.User).ConfigureAwait(false);
if (_queueProcessingSemaphore.CurrentCount == 0) if (_queueProcessingSemaphore.CurrentCount == 0)
{ {
_queue.Enqueue(request); if (isPriorityQueue) _priorityQueue.Enqueue(request);
else _queue.Enqueue(request);
return; return;
} }
try try
{ {
await _queueSemaphore.WaitAsync().ConfigureAwait(false); await _queueSemaphore.WaitAsync().ConfigureAwait(false);
_queue.Enqueue(request); if (isPriorityQueue) _priorityQueue.Enqueue(request);
else _queue.Enqueue(request);
return; return;
} }
@@ -120,8 +146,13 @@ public class RequestQueueService : IHostedService
return Task.CompletedTask; return Task.CompletedTask;
} }
public bool StillEnqueued(Guid request, string user) public async Task<bool> StillEnqueued(Guid request, string user)
{ {
bool isPriorityQueue = await IsHighPriority(user).ConfigureAwait(false);
if (isPriorityQueue)
{
return _priorityQueue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
}
return _queue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal)); return _queue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
} }
@@ -179,6 +210,20 @@ public class RequestQueueService : IHostedService
bool enqueued = false; bool enqueued = false;
while (!enqueued) while (!enqueued)
{ {
if (_priorityQueue.TryDequeue(out var prioRequest))
{
if (_queueRemoval.TryGetValue(prioRequest.RequestId, out string user) && string.Equals(user, prioRequest.User, StringComparison.Ordinal))
{
_logger.LogDebug("Request cancelled: {requestId} by {user}", prioRequest.RequestId, user);
_queueRemoval.Remove(prioRequest.RequestId, out _);
continue;
}
await DequeueIntoSlotAsync(prioRequest, i).ConfigureAwait(false);
enqueued = true;
break;
}
if (_queue.TryDequeue(out var request)) if (_queue.TryDequeue(out var request))
{ {
if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal)) if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal))