 42b15cb6b7
			
		
	
	42b15cb6b7
	
	
	
		
			
			* test add queueing to file service * further adjustments to download queueing * add check for whether the request is still in the queue to CheckQueue * forcefully release slot if download didn't finish in 15s * actually cancel the delay task * add metrics and refactor some of the request queue service * refactor pathing * reuse httpclient * add queue request dto to requestfile, enqueue users immediately if a slot is available * change startup to include all controllers * update server pathing * update pathing, again * several adjustments to auth, banning, jwt server tokens, renaming, authorization * update api I guess * adjust automated banning of charaident and reg * generate jwt on servers for internal authentication * remove mvcextensions Co-authored-by: rootdarkarchon <root.darkarchon@outlook.com>
		
			
				
	
	
		
			138 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			138 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| using MareSynchronos.API;
 | |
| using MareSynchronosShared.Metrics;
 | |
| using MareSynchronosShared.Services;
 | |
| using MareSynchronosStaticFilesServer.Utils;
 | |
| using System.Collections.Concurrent;
 | |
| 
 | |
| namespace MareSynchronosStaticFilesServer.Services;
 | |
| 
 | |
| public class RequestQueueService : IHostedService
 | |
| {
 | |
|     private CancellationTokenSource _queueCts = new();
 | |
|     private readonly UserQueueEntry[] _userQueueRequests;
 | |
|     private readonly ConcurrentQueue<UserRequest> _queue = new();
 | |
|     private readonly MareMetrics _metrics;
 | |
|     private readonly ILogger<RequestQueueService> _logger;
 | |
|     private readonly int _queueExpirationSeconds;
 | |
|     private SemaphoreSlim _queueSemaphore = new(1);
 | |
|     private SemaphoreSlim _queueProcessingSemaphore = new(1);
 | |
| 
 | |
|     public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger)
 | |
|     {
 | |
|         _userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)];
 | |
|         _queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5);
 | |
|         _metrics = metrics;
 | |
|         _logger = logger;
 | |
|     }
 | |
| 
 | |
|     public async Task<QueueStatus> EnqueueUser(UserRequest request)
 | |
|     {
 | |
|         _logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, request.FileId);
 | |
| 
 | |
|         if (_queueProcessingSemaphore.CurrentCount == 0)
 | |
|         {
 | |
|             _queue.Enqueue(request);
 | |
|             return QueueStatus.Waiting;
 | |
|         }
 | |
| 
 | |
|         await _queueSemaphore.WaitAsync().ConfigureAwait(false);
 | |
|         QueueStatus status = QueueStatus.Waiting;
 | |
|         var idx = Array.FindIndex(_userQueueRequests, r => r == null);
 | |
|         if (idx == -1)
 | |
|         {
 | |
|             _queue.Enqueue(request);
 | |
|             status = QueueStatus.Waiting;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             DequeueIntoSlot(request, idx);
 | |
|             status = QueueStatus.Ready;
 | |
|         }
 | |
|         _queueSemaphore.Release();
 | |
| 
 | |
|         return status;
 | |
|     }
 | |
| 
 | |
|     public bool StillEnqueued(Guid request, string user, out int queuePosition)
 | |
|     {
 | |
|         var result = _queue.FirstOrDefault(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
 | |
|         if (result != null)
 | |
|         {
 | |
|             queuePosition = Array.IndexOf(_queue.ToArray(), result);
 | |
|             return true;
 | |
|         }
 | |
| 
 | |
|         queuePosition = -1;
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     public bool IsActiveProcessing(Guid request, string user, out UserRequest userRequest)
 | |
|     {
 | |
|         var userQueueRequest = _userQueueRequests.Where(u => u != null)
 | |
|             .FirstOrDefault(f => f.UserRequest.RequestId == request && string.Equals(f.UserRequest.User, user, StringComparison.Ordinal));
 | |
|         userRequest = userQueueRequest?.UserRequest ?? null;
 | |
|         return userQueueRequest != null && userRequest != null && userQueueRequest.ExpirationDate > DateTime.UtcNow;
 | |
|     }
 | |
| 
 | |
|     public void FinishRequest(Guid request)
 | |
|     {
 | |
|         var req = _userQueueRequests.First(f => f.UserRequest.RequestId == request);
 | |
|         var idx = Array.IndexOf(_userQueueRequests, req);
 | |
|         _logger.LogDebug("Finishing Request {guid}, clearing slot {idx}", request, idx);
 | |
|         _userQueueRequests[idx] = null;
 | |
|     }
 | |
| 
 | |
|     public void ActivateRequest(Guid request)
 | |
|     {
 | |
|         _logger.LogDebug("Activating request {guid}", request);
 | |
|         _userQueueRequests.First(f => f.UserRequest.RequestId == request).IsActive = true;
 | |
|     }
 | |
| 
 | |
|     private async Task ProcessRequestQueue(CancellationToken ct)
 | |
|     {
 | |
|         while (!ct.IsCancellationRequested)
 | |
|         {
 | |
|             await _queueProcessingSemaphore.WaitAsync(ct).ConfigureAwait(false);
 | |
|             await _queueSemaphore.WaitAsync(ct).ConfigureAwait(false);
 | |
|             for (int i = 0; i < _userQueueRequests.Length; i++)
 | |
|             {
 | |
|                 if (_userQueueRequests[i] != null && !_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow) _userQueueRequests[i] = null;
 | |
| 
 | |
|                 if (_userQueueRequests[i] == null)
 | |
|                 {
 | |
|                     if (_queue.TryDequeue(out var request))
 | |
|                     {
 | |
|                         DequeueIntoSlot(request, i);
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 if (!_queue.Any()) break;
 | |
|             }
 | |
|             _queueProcessingSemaphore.Release();
 | |
|             _queueSemaphore.Release();
 | |
| 
 | |
|             _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count);
 | |
| 
 | |
|             await Task.Delay(250).ConfigureAwait(false);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private void DequeueIntoSlot(UserRequest userRequest, int slot)
 | |
|     {
 | |
|         _logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, userRequest.FileId);
 | |
|         _userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds));
 | |
|     }
 | |
| 
 | |
|     public Task StartAsync(CancellationToken cancellationToken)
 | |
|     {
 | |
|         _ = Task.Run(() => ProcessRequestQueue(_queueCts.Token));
 | |
|         return Task.CompletedTask;
 | |
|     }
 | |
| 
 | |
|     public Task StopAsync(CancellationToken cancellationToken)
 | |
|     {
 | |
|         _queueCts.Cancel();
 | |
|         return Task.CompletedTask;
 | |
|     }
 | |
| }
 |