add alias to jwt, remove caching from auth, remove db usage from files
This commit is contained in:
@@ -5,9 +5,6 @@ using Microsoft.AspNetCore.SignalR;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Timers;
|
||||
using MareSynchronos.API.SignalR;
|
||||
using MareSynchronosShared.Data;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using System.Linq;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
@@ -25,7 +22,6 @@ public class RequestQueueService : IHostedService
|
||||
private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new();
|
||||
private readonly SemaphoreSlim _queueSemaphore = new(1);
|
||||
private readonly UserQueueEntry[] _userQueueRequests;
|
||||
private readonly ConcurrentDictionary<string, PriorityEntry> _priorityCache = new(StringComparer.Ordinal);
|
||||
private int _queueLimitForReset;
|
||||
private readonly int _queueReleaseSeconds;
|
||||
private System.Timers.Timer _queueTimer;
|
||||
@@ -49,50 +45,11 @@ public class RequestQueueService : IHostedService
|
||||
req.MarkActive();
|
||||
}
|
||||
|
||||
private async Task<bool> IsHighPriority(string uid, MareDbContext mareDbContext)
|
||||
{
|
||||
return false;
|
||||
if (!_priorityCache.TryGetValue(uid, out PriorityEntry entry) || entry.LastChecked.Add(TimeSpan.FromHours(6)) < DateTime.UtcNow)
|
||||
{
|
||||
var user = await mareDbContext.Users.FirstOrDefaultAsync(u => u.UID == uid).ConfigureAwait(false);
|
||||
entry = new(user != null && !string.IsNullOrEmpty(user.Alias), DateTime.UtcNow);
|
||||
_priorityCache[uid] = entry;
|
||||
}
|
||||
|
||||
return entry.IsHighPriority;
|
||||
}
|
||||
|
||||
public async Task EnqueueUser(UserRequest request, MareDbContext mareDbContext)
|
||||
public void EnqueueUser(UserRequest request, bool isPriority)
|
||||
{
|
||||
_logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds));
|
||||
|
||||
bool isPriorityQueue = await IsHighPriority(request.User, mareDbContext).ConfigureAwait(false);
|
||||
|
||||
if (_queueProcessingSemaphore.CurrentCount == 0)
|
||||
{
|
||||
if (isPriorityQueue) _priorityQueue.Enqueue(request);
|
||||
else _queue.Enqueue(request);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await _queueSemaphore.WaitAsync().ConfigureAwait(false);
|
||||
if (isPriorityQueue) _priorityQueue.Enqueue(request);
|
||||
else _queue.Enqueue(request);
|
||||
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error during EnqueueUser");
|
||||
}
|
||||
finally
|
||||
{
|
||||
_queueSemaphore.Release();
|
||||
}
|
||||
|
||||
throw new Exception("Error during EnqueueUser");
|
||||
GetQueue(isPriority).Enqueue(request);
|
||||
}
|
||||
|
||||
public void FinishRequest(Guid request)
|
||||
@@ -117,9 +74,10 @@ public class RequestQueueService : IHostedService
|
||||
return userQueueRequest != null && userRequest != null && userQueueRequest.ExpirationDate > DateTime.UtcNow;
|
||||
}
|
||||
|
||||
public void RemoveFromQueue(Guid requestId, string user)
|
||||
public void RemoveFromQueue(Guid requestId, string user, bool isPriority)
|
||||
{
|
||||
if (!_queue.Any(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal)))
|
||||
var existingRequest = GetQueue(isPriority).FirstOrDefault(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal));
|
||||
if (existingRequest == null)
|
||||
{
|
||||
var activeSlot = _userQueueRequests.FirstOrDefault(r => r != null && string.Equals(r.UserRequest.User, user, StringComparison.Ordinal) && r.UserRequest.RequestId == requestId);
|
||||
if (activeSlot != null)
|
||||
@@ -145,14 +103,11 @@ public class RequestQueueService : IHostedService
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task<bool> StillEnqueued(Guid request, string user, MareDbContext mareDbContext)
|
||||
private ConcurrentQueue<UserRequest> GetQueue(bool isPriority) => isPriority ? _priorityQueue : _queue;
|
||||
|
||||
public bool StillEnqueued(Guid request, string user, bool isPriority)
|
||||
{
|
||||
bool isPriorityQueue = await IsHighPriority(user, mareDbContext).ConfigureAwait(false);
|
||||
if (isPriorityQueue)
|
||||
{
|
||||
return _priorityQueue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
|
||||
}
|
||||
return _queue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
|
||||
return GetQueue(isPriority).Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
@@ -182,11 +137,7 @@ public class RequestQueueService : IHostedService
|
||||
return;
|
||||
}
|
||||
|
||||
Parallel.For(0, _userQueueRequests.Length, new ParallelOptions()
|
||||
{
|
||||
MaxDegreeOfParallelism = 10,
|
||||
},
|
||||
async (i) =>
|
||||
for (int i = 0; i < _userQueueRequests.Length; i++)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -246,7 +197,7 @@ public class RequestQueueService : IHostedService
|
||||
{
|
||||
_logger.LogWarning(ex, "Error during inside queue processing");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user