some file server fixes I guess
break, not return fix queue processing fix queue processing, again fix queue processing, the third do not use async for queue processing something
This commit is contained in:
@@ -31,6 +31,9 @@ public class MetricsAPI
|
|||||||
public const string GaugeQueueActive = "mare_download_queue_active";
|
public const string GaugeQueueActive = "mare_download_queue_active";
|
||||||
public const string GaugeQueueInactive = "mare_download_queue_inactive";
|
public const string GaugeQueueInactive = "mare_download_queue_inactive";
|
||||||
public const string GaugeDownloadQueue = "mare_download_queue";
|
public const string GaugeDownloadQueue = "mare_download_queue";
|
||||||
|
public const string GaugeDownloadQueueCancelled = "mare_download_queue_cancelled";
|
||||||
|
public const string GaugeDownloadPriorityQueue = "mare_download_priority_queue";
|
||||||
|
public const string GaugeDownloadPriorityQueueCancelled = "mare_download_priority_queue_cancelled";
|
||||||
public const string CounterFileRequests = "mare_files_requests";
|
public const string CounterFileRequests = "mare_files_requests";
|
||||||
public const string CounterFileRequestSize = "mare_files_request_size";
|
public const string CounterFileRequestSize = "mare_files_request_size";
|
||||||
public const string CounterAccountsCreated = "mare_accounts_created";
|
public const string CounterAccountsCreated = "mare_accounts_created";
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ public class RequestController : ControllerBase
|
|||||||
{
|
{
|
||||||
private readonly CachedFileProvider _cachedFileProvider;
|
private readonly CachedFileProvider _cachedFileProvider;
|
||||||
private readonly RequestQueueService _requestQueue;
|
private readonly RequestQueueService _requestQueue;
|
||||||
private static readonly SemaphoreSlim _parallelRequestSemaphore = new(500);
|
|
||||||
|
|
||||||
public RequestController(ILogger<RequestController> logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue) : base(logger)
|
public RequestController(ILogger<RequestController> logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue) : base(logger)
|
||||||
{
|
{
|
||||||
@@ -23,15 +22,10 @@ public class RequestController : ControllerBase
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
|
|
||||||
_requestQueue.RemoveFromQueue(requestId, MareUser, IsPriority);
|
_requestQueue.RemoveFromQueue(requestId, MareUser, IsPriority);
|
||||||
return Ok();
|
return Ok();
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException) { return BadRequest(); }
|
catch (OperationCanceledException) { return BadRequest(); }
|
||||||
finally
|
|
||||||
{
|
|
||||||
_parallelRequestSemaphore.Release();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[HttpPost]
|
[HttpPost]
|
||||||
@@ -40,7 +34,6 @@ public class RequestController : ControllerBase
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
|
|
||||||
foreach (var file in files)
|
foreach (var file in files)
|
||||||
{
|
{
|
||||||
_logger.LogDebug("Prerequested file: " + file);
|
_logger.LogDebug("Prerequested file: " + file);
|
||||||
@@ -48,15 +41,11 @@ public class RequestController : ControllerBase
|
|||||||
}
|
}
|
||||||
|
|
||||||
Guid g = Guid.NewGuid();
|
Guid g = Guid.NewGuid();
|
||||||
_requestQueue.EnqueueUser(new(g, MareUser, files.ToList()), IsPriority);
|
await _requestQueue.EnqueueUser(new(g, MareUser, files.ToList()), IsPriority, HttpContext.RequestAborted);
|
||||||
|
|
||||||
return Ok(g);
|
return Ok(g);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException) { return BadRequest(); }
|
catch (OperationCanceledException) { return BadRequest(); }
|
||||||
finally
|
|
||||||
{
|
|
||||||
_parallelRequestSemaphore.Release();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[HttpGet]
|
[HttpGet]
|
||||||
@@ -66,7 +55,7 @@ public class RequestController : ControllerBase
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (!_requestQueue.StillEnqueued(requestId, MareUser, IsPriority))
|
if (!_requestQueue.StillEnqueued(requestId, MareUser, IsPriority))
|
||||||
_requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList()), IsPriority);
|
await _requestQueue.EnqueueUser(new(requestId, MareUser, files.ToList()), IsPriority, HttpContext.RequestAborted);
|
||||||
return Ok();
|
return Ok();
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException) { return BadRequest(); }
|
catch (OperationCanceledException) { return BadRequest(); }
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ public class RequestQueueService : IHostedService
|
|||||||
private readonly ConcurrentQueue<UserRequest> _priorityQueue = new();
|
private readonly ConcurrentQueue<UserRequest> _priorityQueue = new();
|
||||||
private readonly int _queueExpirationSeconds;
|
private readonly int _queueExpirationSeconds;
|
||||||
private readonly SemaphoreSlim _queueProcessingSemaphore = new(1);
|
private readonly SemaphoreSlim _queueProcessingSemaphore = new(1);
|
||||||
private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new();
|
|
||||||
private readonly SemaphoreSlim _queueSemaphore = new(1);
|
private readonly SemaphoreSlim _queueSemaphore = new(1);
|
||||||
private readonly UserQueueEntry[] _userQueueRequests;
|
private readonly UserQueueEntry[] _userQueueRequests;
|
||||||
private int _queueLimitForReset;
|
private int _queueLimitForReset;
|
||||||
@@ -45,8 +44,13 @@ public class RequestQueueService : IHostedService
|
|||||||
req.MarkActive();
|
req.MarkActive();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void EnqueueUser(UserRequest request, bool isPriority)
|
public async Task EnqueueUser(UserRequest request, bool isPriority, CancellationToken token)
|
||||||
{
|
{
|
||||||
|
while (_queueProcessingSemaphore.CurrentCount == 0)
|
||||||
|
{
|
||||||
|
await Task.Delay(50, token).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
_logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds));
|
_logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds));
|
||||||
|
|
||||||
GetQueue(isPriority).Enqueue(request);
|
GetQueue(isPriority).Enqueue(request);
|
||||||
@@ -88,10 +92,11 @@ public class RequestQueueService : IHostedService
|
|||||||
_userQueueRequests[idx] = null;
|
_userQueueRequests[idx] = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
_queueRemoval[requestId] = user;
|
else
|
||||||
|
{
|
||||||
|
existingRequest.IsCancelled = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task StartAsync(CancellationToken cancellationToken)
|
public Task StartAsync(CancellationToken cancellationToken)
|
||||||
@@ -116,22 +121,22 @@ public class RequestQueueService : IHostedService
|
|||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task DequeueIntoSlotAsync(UserRequest userRequest, int slot)
|
private void DequeueIntoSlot(UserRequest userRequest, int slot)
|
||||||
{
|
{
|
||||||
_logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, string.Join(", ", userRequest.FileIds));
|
_logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, string.Join(", ", userRequest.FileIds));
|
||||||
_userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds));
|
_userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds));
|
||||||
await _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false);
|
_ = _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async void ProcessQueue(object src, ElapsedEventArgs e)
|
private void ProcessQueue(object src, ElapsedEventArgs e)
|
||||||
{
|
{
|
||||||
if (_queueProcessingSemaphore.CurrentCount == 0) return;
|
if (_queueProcessingSemaphore.CurrentCount == 0) return;
|
||||||
|
|
||||||
await _queueProcessingSemaphore.WaitAsync().ConfigureAwait(false);
|
_queueProcessingSemaphore.Wait();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (_queue.Count > _queueLimitForReset)
|
if (_queue.Count(c => !c.IsCancelled) > _queueLimitForReset)
|
||||||
{
|
{
|
||||||
_queue.Clear();
|
_queue.Clear();
|
||||||
return;
|
return;
|
||||||
@@ -141,56 +146,36 @@ public class RequestQueueService : IHostedService
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (_userQueueRequests[i] != null && ((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow)))
|
if (_userQueueRequests[i] != null
|
||||||
|
&& (((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow))
|
||||||
|
|| (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds))))
|
||||||
|
)
|
||||||
{
|
{
|
||||||
_logger.LogDebug("Expiring inactive request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
|
_logger.LogDebug("Expiring request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
|
||||||
_userQueueRequests[i] = null;
|
_userQueueRequests[i] = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_userQueueRequests[i] != null && (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds))))
|
if (_userQueueRequests[i] != null) continue;
|
||||||
{
|
|
||||||
_logger.LogDebug("Expiring active request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
|
|
||||||
_userQueueRequests[i] = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!_queue.Any()) return;
|
while (true)
|
||||||
|
|
||||||
if (_userQueueRequests[i] == null)
|
|
||||||
{
|
{
|
||||||
bool enqueued = false;
|
if (_priorityQueue.TryDequeue(out var prioRequest))
|
||||||
while (!enqueued)
|
|
||||||
{
|
{
|
||||||
if (_priorityQueue.TryDequeue(out var prioRequest))
|
if (prioRequest.IsCancelled) continue;
|
||||||
{
|
|
||||||
if (_queueRemoval.TryGetValue(prioRequest.RequestId, out string user) && string.Equals(user, prioRequest.User, StringComparison.Ordinal))
|
|
||||||
{
|
|
||||||
_logger.LogDebug("Request cancelled: {requestId} by {user}", prioRequest.RequestId, user);
|
|
||||||
_queueRemoval.Remove(prioRequest.RequestId, out _);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
await DequeueIntoSlotAsync(prioRequest, i).ConfigureAwait(false);
|
DequeueIntoSlot(prioRequest, i);
|
||||||
enqueued = true;
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_queue.TryDequeue(out var request))
|
|
||||||
{
|
|
||||||
if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal))
|
|
||||||
{
|
|
||||||
_logger.LogDebug("Request cancelled: {requestId} by {user}", request.RequestId, user);
|
|
||||||
_queueRemoval.Remove(request.RequestId, out _);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
await DequeueIntoSlotAsync(request, i).ConfigureAwait(false);
|
|
||||||
enqueued = true;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
enqueued = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_queue.TryDequeue(out var request))
|
||||||
|
{
|
||||||
|
if (request.IsCancelled) continue;
|
||||||
|
|
||||||
|
DequeueIntoSlot(request, i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
@@ -211,6 +196,9 @@ public class RequestQueueService : IHostedService
|
|||||||
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueFree, _userQueueRequests.Count(c => c == null));
|
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueFree, _userQueueRequests.Count(c => c == null));
|
||||||
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueActive, _userQueueRequests.Count(c => c != null && c.IsActive));
|
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueActive, _userQueueRequests.Count(c => c != null && c.IsActive));
|
||||||
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueInactive, _userQueueRequests.Count(c => c != null && !c.IsActive));
|
_metrics.SetGaugeTo(MetricsAPI.GaugeQueueInactive, _userQueueRequests.Count(c => c != null && !c.IsActive));
|
||||||
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count);
|
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count(q => !q.IsCancelled));
|
||||||
|
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueueCancelled, _queue.Count(q => q.IsCancelled));
|
||||||
|
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueue, _priorityQueue.Count(q => !q.IsCancelled));
|
||||||
|
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueueCancelled, _priorityQueue.Count(q => q.IsCancelled));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -65,6 +65,9 @@ public class Startup
|
|||||||
MetricsAPI.GaugeFilesUniquePastHourSize,
|
MetricsAPI.GaugeFilesUniquePastHourSize,
|
||||||
MetricsAPI.GaugeCurrentDownloads,
|
MetricsAPI.GaugeCurrentDownloads,
|
||||||
MetricsAPI.GaugeDownloadQueue,
|
MetricsAPI.GaugeDownloadQueue,
|
||||||
|
MetricsAPI.GaugeDownloadQueueCancelled,
|
||||||
|
MetricsAPI.GaugeDownloadPriorityQueue,
|
||||||
|
MetricsAPI.GaugeDownloadPriorityQueueCancelled,
|
||||||
MetricsAPI.GaugeQueueFree,
|
MetricsAPI.GaugeQueueFree,
|
||||||
MetricsAPI.GaugeQueueInactive,
|
MetricsAPI.GaugeQueueInactive,
|
||||||
MetricsAPI.GaugeQueueActive,
|
MetricsAPI.GaugeQueueActive,
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
namespace MareSynchronosStaticFilesServer.Utils;
|
namespace MareSynchronosStaticFilesServer.Utils;
|
||||||
|
|
||||||
public record UserRequest(Guid RequestId, string User, List<string> FileIds);
|
public record UserRequest(Guid RequestId, string User, List<string> FileIds)
|
||||||
|
{
|
||||||
|
public bool IsCancelled { get; set; } = false;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user