try processing queue with timer instead of task
This commit is contained in:
@@ -3,12 +3,12 @@ using MareSynchronosShared.Metrics;
|
|||||||
using MareSynchronosShared.Services;
|
using MareSynchronosShared.Services;
|
||||||
using MareSynchronosStaticFilesServer.Utils;
|
using MareSynchronosStaticFilesServer.Utils;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
|
using System.Timers;
|
||||||
|
|
||||||
namespace MareSynchronosStaticFilesServer.Services;
|
namespace MareSynchronosStaticFilesServer.Services;
|
||||||
|
|
||||||
public class RequestQueueService : IHostedService
|
public class RequestQueueService : IHostedService
|
||||||
{
|
{
|
||||||
private CancellationTokenSource _queueCts = new();
|
|
||||||
private readonly UserQueueEntry[] _userQueueRequests;
|
private readonly UserQueueEntry[] _userQueueRequests;
|
||||||
private readonly ConcurrentQueue<UserRequest> _queue = new();
|
private readonly ConcurrentQueue<UserRequest> _queue = new();
|
||||||
private readonly MareMetrics _metrics;
|
private readonly MareMetrics _metrics;
|
||||||
@@ -16,6 +16,8 @@ public class RequestQueueService : IHostedService
|
|||||||
private readonly int _queueExpirationSeconds;
|
private readonly int _queueExpirationSeconds;
|
||||||
private SemaphoreSlim _queueSemaphore = new(1);
|
private SemaphoreSlim _queueSemaphore = new(1);
|
||||||
private SemaphoreSlim _queueProcessingSemaphore = new(1);
|
private SemaphoreSlim _queueProcessingSemaphore = new(1);
|
||||||
|
private bool _isProcessingQueue = false;
|
||||||
|
private System.Timers.Timer _queueTimer;
|
||||||
|
|
||||||
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger)
|
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger)
|
||||||
{
|
{
|
||||||
@@ -100,44 +102,43 @@ public class RequestQueueService : IHostedService
|
|||||||
_userQueueRequests.Where(f => f != null).First(f => f.UserRequest.RequestId == request).IsActive = true;
|
_userQueueRequests.Where(f => f != null).First(f => f.UserRequest.RequestId == request).IsActive = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task ProcessRequestQueue(CancellationToken ct)
|
private async void ProcessQueue(object src, ElapsedEventArgs e)
|
||||||
{
|
{
|
||||||
while (!ct.IsCancellationRequested)
|
if (_isProcessingQueue) return;
|
||||||
|
|
||||||
|
_isProcessingQueue = true;
|
||||||
|
try
|
||||||
{
|
{
|
||||||
try
|
await _queueProcessingSemaphore.WaitAsync().ConfigureAwait(false);
|
||||||
|
await _queueSemaphore.WaitAsync().ConfigureAwait(false);
|
||||||
|
for (int i = 0; i < _userQueueRequests.Length; i++)
|
||||||
{
|
{
|
||||||
await _queueProcessingSemaphore.WaitAsync(ct).ConfigureAwait(false);
|
if (_userQueueRequests[i] != null && !_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow) _userQueueRequests[i] = null;
|
||||||
await _queueSemaphore.WaitAsync(ct).ConfigureAwait(false);
|
|
||||||
for (int i = 0; i < _userQueueRequests.Length; i++)
|
if (_userQueueRequests[i] == null)
|
||||||
{
|
{
|
||||||
if (_userQueueRequests[i] != null && !_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow) _userQueueRequests[i] = null;
|
if (_queue.TryDequeue(out var request))
|
||||||
|
|
||||||
if (_userQueueRequests[i] == null)
|
|
||||||
{
|
{
|
||||||
if (_queue.TryDequeue(out var request))
|
DequeueIntoSlot(request, i);
|
||||||
{
|
|
||||||
DequeueIntoSlot(request, i);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!_queue.Any()) break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
if (!_queue.Any()) break;
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, "Error during Queue processing");
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_queueProcessingSemaphore.Release();
|
|
||||||
_queueSemaphore.Release();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count);
|
|
||||||
|
|
||||||
await Task.Delay(250).ConfigureAwait(false);
|
|
||||||
}
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error during Queue processing");
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_queueProcessingSemaphore.Release();
|
||||||
|
_queueSemaphore.Release();
|
||||||
|
}
|
||||||
|
|
||||||
|
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count);
|
||||||
|
_isProcessingQueue = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void DequeueIntoSlot(UserRequest userRequest, int slot)
|
private void DequeueIntoSlot(UserRequest userRequest, int slot)
|
||||||
@@ -148,13 +149,15 @@ public class RequestQueueService : IHostedService
|
|||||||
|
|
||||||
public Task StartAsync(CancellationToken cancellationToken)
|
public Task StartAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_ = Task.Run(() => ProcessRequestQueue(_queueCts.Token));
|
_queueTimer = new System.Timers.Timer(250);
|
||||||
|
_queueTimer.Elapsed += ProcessQueue;
|
||||||
|
_queueTimer.AutoReset = true;
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task StopAsync(CancellationToken cancellationToken)
|
public Task StopAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_queueCts.Cancel();
|
_queueTimer.Stop();
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user