From ba37a25869c9d2926f7baea33b0620ffd5b2da8a Mon Sep 17 00:00:00 2001 From: rootdarkarchon Date: Wed, 2 Aug 2023 11:37:34 +0200 Subject: [PATCH] adjust queue processing --- .../Services/RequestQueueService.cs | 17 +++++++++-- .../Utils/RequestFileStreamResult.cs | 28 +++---------------- .../Utils/RequestFileStreamResultFactory.cs | 4 +-- .../Utils/UserQueueEntry.cs | 19 +++++++++++-- 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs index 2be2bde..9bfb488 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs @@ -20,6 +20,7 @@ public class RequestQueueService : IHostedService private readonly SemaphoreSlim _queueSemaphore = new(1); private readonly UserQueueEntry[] _userQueueRequests; private int _queueLimitForReset; + private readonly int _queueReleaseSeconds; private System.Timers.Timer _queueTimer; public RequestQueueService(MareMetrics metrics, IConfigurationService configurationService, ILogger logger, IHubContext hubContext) @@ -27,6 +28,7 @@ public class RequestQueueService : IHostedService _userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)]; _queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5); _queueLimitForReset = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueClearLimit), 15000); + _queueReleaseSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueReleaseSeconds), 15); _metrics = metrics; _logger = logger; _hubContext = hubContext; @@ -35,7 +37,8 @@ public class RequestQueueService : IHostedService public void ActivateRequest(Guid request) { _logger.LogDebug("Activating request {guid}", request); - _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request).IsActive = true; + var req = _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request); + req.MarkActive(); } public async Task EnqueueUser(UserRequest request) @@ -152,7 +155,17 @@ public class RequestQueueService : IHostedService { if (!_queue.Any()) return; - if (_userQueueRequests[i] != null && !_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow) _userQueueRequests[i] = null; + if (_userQueueRequests[i] != null && ((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow))) + { + _logger.LogDebug("Expiring inactive request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i); + _userQueueRequests[i] = null; + } + + if (_userQueueRequests[i] != null && (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds)))) + { + _logger.LogDebug("Expiring active request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i); + _userQueueRequests[i] = null; + } if (_userQueueRequests[i] == null) { diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResult.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResult.cs index 388b648..207524d 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResult.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResult.cs @@ -9,28 +9,14 @@ public class RequestFileStreamResult : FileStreamResult private readonly Guid _requestId; private readonly RequestQueueService _requestQueueService; private readonly MareMetrics _mareMetrics; - private readonly CancellationTokenSource _releaseCts = new(); - private bool _releasedSlot = false; - public RequestFileStreamResult(Guid requestId, int secondsUntilRelease, RequestQueueService requestQueueService, - MareMetrics mareMetrics, Stream fileStream, string contentType) : base(fileStream, contentType) + public RequestFileStreamResult(Guid requestId, RequestQueueService requestQueueService, MareMetrics mareMetrics, + Stream fileStream, string contentType) : base(fileStream, contentType) { _requestId = requestId; _requestQueueService = requestQueueService; _mareMetrics = mareMetrics; _mareMetrics.IncGauge(MetricsAPI.GaugeCurrentDownloads); - - // forcefully release slot after secondsUntilRelease - _ = Task.Run(async () => - { - try - { - await Task.Delay(TimeSpan.FromSeconds(secondsUntilRelease), _releaseCts.Token).ConfigureAwait(false); - _requestQueueService.FinishRequest(_requestId); - _releasedSlot = true; - } - catch { } - }); } public override void ExecuteResult(ActionContext context) @@ -45,10 +31,7 @@ public class RequestFileStreamResult : FileStreamResult } finally { - _releaseCts.Cancel(); - - if (!_releasedSlot) - _requestQueueService.FinishRequest(_requestId); + _requestQueueService.FinishRequest(_requestId); _mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads); } @@ -66,10 +49,7 @@ public class RequestFileStreamResult : FileStreamResult } finally { - _releaseCts.Cancel(); - - if (!_releasedSlot) - _requestQueueService.FinishRequest(_requestId); + _requestQueueService.FinishRequest(_requestId); _mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads); } diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResultFactory.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResultFactory.cs index 8ce225c..e1e05ac 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResultFactory.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResultFactory.cs @@ -19,7 +19,7 @@ public class RequestFileStreamResultFactory public RequestFileStreamResult Create(Guid requestId, MemoryStream ms) { - return new RequestFileStreamResult(requestId, _configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueReleaseSeconds), 15), - _requestQueueService, _metrics, ms, "application/octet-stream"); + return new RequestFileStreamResult(requestId, _requestQueueService, + _metrics, ms, "application/octet-stream"); } } \ No newline at end of file diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/UserQueueEntry.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/UserQueueEntry.cs index 7f9c550..5646eca 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/UserQueueEntry.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/UserQueueEntry.cs @@ -1,6 +1,21 @@ namespace MareSynchronosStaticFilesServer.Utils; -public record UserQueueEntry(UserRequest UserRequest, DateTime ExpirationDate) +public class UserQueueEntry { - public bool IsActive { get; set; } = false; + public UserQueueEntry(UserRequest userRequest, DateTime expirationDate) + { + UserRequest = userRequest; + ExpirationDate = expirationDate; + } + + public void MarkActive() + { + IsActive = true; + ActivationDate = DateTime.UtcNow; + } + + public UserRequest UserRequest { get; } + public DateTime ExpirationDate { get; } + public bool IsActive { get; private set; } = false; + public DateTime ActivationDate { get; private set; } }