adjust queue processing

This commit is contained in:
rootdarkarchon
2023-08-02 11:37:34 +02:00
parent 255798c0c6
commit ba37a25869
4 changed files with 38 additions and 30 deletions

View File

@@ -20,6 +20,7 @@ public class RequestQueueService : IHostedService
private readonly SemaphoreSlim _queueSemaphore = new(1); private readonly SemaphoreSlim _queueSemaphore = new(1);
private readonly UserQueueEntry[] _userQueueRequests; private readonly UserQueueEntry[] _userQueueRequests;
private int _queueLimitForReset; private int _queueLimitForReset;
private readonly int _queueReleaseSeconds;
private System.Timers.Timer _queueTimer; private System.Timers.Timer _queueTimer;
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger, IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext) public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger, IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext)
@@ -27,6 +28,7 @@ public class RequestQueueService : IHostedService
_userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)]; _userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)];
_queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5); _queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5);
_queueLimitForReset = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueClearLimit), 15000); _queueLimitForReset = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueClearLimit), 15000);
_queueReleaseSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueReleaseSeconds), 15);
_metrics = metrics; _metrics = metrics;
_logger = logger; _logger = logger;
_hubContext = hubContext; _hubContext = hubContext;
@@ -35,7 +37,8 @@ public class RequestQueueService : IHostedService
public void ActivateRequest(Guid request) public void ActivateRequest(Guid request)
{ {
_logger.LogDebug("Activating request {guid}", request); _logger.LogDebug("Activating request {guid}", request);
_userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request).IsActive = true; var req = _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request);
req.MarkActive();
} }
public async Task EnqueueUser(UserRequest request) public async Task EnqueueUser(UserRequest request)
@@ -152,7 +155,17 @@ public class RequestQueueService : IHostedService
{ {
if (!_queue.Any()) return; if (!_queue.Any()) return;
if (_userQueueRequests[i] != null && !_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow) _userQueueRequests[i] = null; if (_userQueueRequests[i] != null && ((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow)))
{
_logger.LogDebug("Expiring inactive request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
_userQueueRequests[i] = null;
}
if (_userQueueRequests[i] != null && (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds))))
{
_logger.LogDebug("Expiring active request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i);
_userQueueRequests[i] = null;
}
if (_userQueueRequests[i] == null) if (_userQueueRequests[i] == null)
{ {

View File

@@ -9,28 +9,14 @@ public class RequestFileStreamResult : FileStreamResult
private readonly Guid _requestId; private readonly Guid _requestId;
private readonly RequestQueueService _requestQueueService; private readonly RequestQueueService _requestQueueService;
private readonly MareMetrics _mareMetrics; private readonly MareMetrics _mareMetrics;
private readonly CancellationTokenSource _releaseCts = new();
private bool _releasedSlot = false;
public RequestFileStreamResult(Guid requestId, int secondsUntilRelease, RequestQueueService requestQueueService, public RequestFileStreamResult(Guid requestId, RequestQueueService requestQueueService, MareMetrics mareMetrics,
MareMetrics mareMetrics, Stream fileStream, string contentType) : base(fileStream, contentType) Stream fileStream, string contentType) : base(fileStream, contentType)
{ {
_requestId = requestId; _requestId = requestId;
_requestQueueService = requestQueueService; _requestQueueService = requestQueueService;
_mareMetrics = mareMetrics; _mareMetrics = mareMetrics;
_mareMetrics.IncGauge(MetricsAPI.GaugeCurrentDownloads); _mareMetrics.IncGauge(MetricsAPI.GaugeCurrentDownloads);
// forcefully release slot after secondsUntilRelease
_ = Task.Run(async () =>
{
try
{
await Task.Delay(TimeSpan.FromSeconds(secondsUntilRelease), _releaseCts.Token).ConfigureAwait(false);
_requestQueueService.FinishRequest(_requestId);
_releasedSlot = true;
}
catch { }
});
} }
public override void ExecuteResult(ActionContext context) public override void ExecuteResult(ActionContext context)
@@ -45,10 +31,7 @@ public class RequestFileStreamResult : FileStreamResult
} }
finally finally
{ {
_releaseCts.Cancel(); _requestQueueService.FinishRequest(_requestId);
if (!_releasedSlot)
_requestQueueService.FinishRequest(_requestId);
_mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads); _mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads);
} }
@@ -66,10 +49,7 @@ public class RequestFileStreamResult : FileStreamResult
} }
finally finally
{ {
_releaseCts.Cancel(); _requestQueueService.FinishRequest(_requestId);
if (!_releasedSlot)
_requestQueueService.FinishRequest(_requestId);
_mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads); _mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads);
} }

View File

@@ -19,7 +19,7 @@ public class RequestFileStreamResultFactory
public RequestFileStreamResult Create(Guid requestId, MemoryStream ms) public RequestFileStreamResult Create(Guid requestId, MemoryStream ms)
{ {
return new RequestFileStreamResult(requestId, _configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueReleaseSeconds), 15), return new RequestFileStreamResult(requestId, _requestQueueService,
_requestQueueService, _metrics, ms, "application/octet-stream"); _metrics, ms, "application/octet-stream");
} }
} }

View File

@@ -1,6 +1,21 @@
namespace MareSynchronosStaticFilesServer.Utils; namespace MareSynchronosStaticFilesServer.Utils;
public record UserQueueEntry(UserRequest UserRequest, DateTime ExpirationDate) public class UserQueueEntry
{ {
public bool IsActive { get; set; } = false; public UserQueueEntry(UserRequest userRequest, DateTime expirationDate)
{
UserRequest = userRequest;
ExpirationDate = expirationDate;
}
public void MarkActive()
{
IsActive = true;
ActivationDate = DateTime.UtcNow;
}
public UserRequest UserRequest { get; }
public DateTime ExpirationDate { get; }
public bool IsActive { get; private set; } = false;
public DateTime ActivationDate { get; private set; }
} }