Download rework (#22)

* rework server to send download ready back via signalr

* adjust queue handling for removal

* adjust api to main

Co-authored-by: rootdarkarchon <root.darkarchon@outlook.com>
This commit is contained in:
rootdarkarchon
2023-01-15 16:00:38 +01:00
committed by GitHub
parent 5e0e21ef68
commit 4bd71a5889
15 changed files with 165 additions and 87 deletions

View File

@@ -37,7 +37,8 @@
"MainServerGrpcAddress": "http://mare-server:6005",
"DownloadTimeoutSeconds": 30,
"DownloadQueueSize": 50,
"DownloadQueueReleaseSeconds": 15
"DownloadQueueReleaseSeconds": 15,
"RedisConnectionString": "redis,password=secretredispassword"
},
"AllowedHosts": "*",
"Kestrel": {

View File

@@ -37,7 +37,8 @@
"MainServerGrpcAddress": "http://mare-server:6005",
"DownloadTimeoutSeconds": 30,
"DownloadQueueSize": 50,
"DownloadQueueReleaseSeconds": 15
"DownloadQueueReleaseSeconds": 15,
"RedisConnectionString": "redis,password=secretredispassword"
},
"AllowedHosts": "*",
"Kestrel": {

View File

@@ -34,7 +34,8 @@
"UnusedFileRetentionPeriodInDays": 14,
"CacheDirectory": "/marecache/",
"RemoteCacheSourceUri": "",
"MainServerGrpcAddress": "http://mare-server:6005"
"MainServerGrpcAddress": "http://mare-server:6005",
"RedisConnectionString": "redis,password=secretredispassword"
},
"AllowedHosts": "*",
"Kestrel": {

View File

@@ -34,7 +34,8 @@
"UnusedFileRetentionPeriodInDays": 14,
"CacheDirectory": "/marecache/",
"RemoteCacheSourceUri": "",
"MainServerGrpcAddress": "http://mare-server:6005"
"MainServerGrpcAddress": "http://mare-server:6005",
"RedisConnectionString": "redis,password=secretredispassword"
},
"AllowedHosts": "*",
"Kestrel": {

View File

@@ -63,5 +63,10 @@ namespace MareSynchronosServer.Hubs
{
throw new PlatformNotSupportedException("Calling clientside method on server not supported");
}
public Task Client_DownloadReady(Guid requestId)
{
throw new PlatformNotSupportedException("Calling clientside method on server not supported");
}
}
}

View File

@@ -83,15 +83,6 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
};
}
[Authorize(Policy = "Authenticated")]
public async Task<ConnectionDto> Heartbeat(string characterIdentification)
{
return new ConnectionDto()
{
ServerVersion = IMareHub.ApiVersion,
};
}
[Authorize(Policy = "Authenticated")]
public async Task<bool> CheckClientHealth()
{

View File

@@ -10,7 +10,6 @@ using MareSynchronosShared.Protos;
using Grpc.Net.Client.Configuration;
using MareSynchronosShared.Metrics;
using MareSynchronosServer.Services;
using MareSynchronosServer.Utils;
using MareSynchronosServer.RequirementHandlers;
using MareSynchronosShared.Utils;
using MareSynchronosShared.Services;
@@ -80,7 +79,6 @@ public class Startup
services.AddSingleton<ServerTokenGenerator>();
services.AddSingleton<SystemInfoService>();
services.AddSingleton<IUserIdProvider, IdBasedUserIdProvider>();
services.AddHostedService(provider => provider.GetService<SystemInfoService>());
// configure services based on main server status
ConfigureServicesBasedOnShardType(services, mareConfig, isMainServer);
@@ -94,6 +92,8 @@ public class Startup
private static void ConfigureSignalR(IServiceCollection services, IConfigurationSection mareConfig)
{
services.AddSingleton<IUserIdProvider, IdBasedUserIdProvider>();
var signalRServiceBuilder = services.AddSignalR(hubOptions =>
{
hubOptions.MaximumReceiveMessageSize = long.MaxValue;
@@ -106,13 +106,7 @@ public class Startup
// configure redis for SignalR
var redisConnection = mareConfig.GetValue(nameof(ServerConfiguration.RedisConnectionString), string.Empty);
if (!string.IsNullOrEmpty(redisConnection))
{
signalRServiceBuilder.AddStackExchangeRedis(redisConnection, options =>
{
options.Configuration.ChannelPrefix = "MareSynchronos";
});
}
signalRServiceBuilder.AddStackExchangeRedis(redisConnection, options => { });
var options = ConfigurationOptions.Parse(redisConnection);

View File

@@ -1,7 +1,6 @@
using MareSynchronosShared.Utils;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR;
namespace MareSynchronosServer.Utils;
namespace MareSynchronosShared.Utils;
public class IdBasedUserIdProvider : IUserIdProvider
{

View File

@@ -2,7 +2,6 @@
using MareSynchronosShared.Utils;
using MareSynchronosStaticFilesServer.Services;
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
namespace MareSynchronosStaticFilesServer.Controllers;
@@ -20,6 +19,23 @@ public class RequestController : ControllerBase
_requestQueue = requestQueue;
}
[HttpGet]
[Route(MareFiles.Request_Cancel)]
public async Task<IActionResult> CancelQueueRequest(Guid requestId)
{
try
{
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
_requestQueue.RemoveFromQueue(requestId, MareUser);
return Ok();
}
catch (OperationCanceledException) { return BadRequest(); }
finally
{
_parallelRequestSemaphore.Release();
}
}
[HttpPost]
[Route(MareFiles.Request_Enqueue)]
public async Task<IActionResult> PreRequestFilesAsync([FromBody] List<string> files)
@@ -51,34 +67,8 @@ public class RequestController : ControllerBase
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
Guid g = Guid.NewGuid();
_cachedFileProvider.DownloadFileWhenRequired(file, Authorization);
var queueStatus = await _requestQueue.EnqueueUser(new(g, MareUser, file));
return Ok(JsonSerializer.Serialize(new QueueRequestDto(g, queueStatus)));
}
catch (OperationCanceledException) { return BadRequest(); }
finally
{
_parallelRequestSemaphore.Release();
}
}
[HttpGet]
[Route(MareFiles.Request_CheckQueue)]
public async Task<IActionResult> CheckQueueAsync(Guid requestId)
{
try
{
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
if (_requestQueue.IsActiveProcessing(requestId, MareUser, out _))
{
return Ok();
}
if (_requestQueue.StillEnqueued(requestId, MareUser))
{
return Conflict();
}
return BadRequest();
await _requestQueue.EnqueueUser(new(g, MareUser, file));
return Ok(g);
}
catch (OperationCanceledException) { return BadRequest(); }
finally

View File

@@ -18,7 +18,7 @@ public class ServerFilesController : ControllerBase
[HttpGet(MareFiles.ServerFiles_Get + "/{fileId}")]
[Authorize(Policy = "Internal")]
public async Task<IActionResult> GetFile(string fileId)
public IActionResult GetFile(string fileId)
{
_logger.LogInformation($"GetFile:{MareUser}:{fileId}");

View File

@@ -0,0 +1,25 @@
using Microsoft.AspNetCore.SignalR;
// this is a very hacky way to attach this file server to the main mare hub signalr instance via redis
// signalr publishes the namespace and hubname into the redis backend so this needs to be equal to the original
// but I don't need to reimplement the hub completely as I only exclusively use it for internal connection calling
// from the queue service so I keep the namespace and name of the class the same so it can connect to the same channel
// if anyone finds a better way to do this let me know
#pragma warning disable IDE0130 // Namespace does not match folder structure
#pragma warning disable MA0048 // File name must match type name
namespace MareSynchronosServer.Hubs;
public class MareHub : Hub
{
public override Task OnConnectedAsync()
{
throw new NotSupportedException();
}
public override Task OnDisconnectedAsync(Exception exception)
{
throw new NotSupportedException();
}
}
#pragma warning restore IDE0130 // Namespace does not match folder structure
#pragma warning restore MA0048 // File name must match type name

View File

@@ -25,6 +25,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="7.0.1" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.StackExchangeRedis" Version="7.0.2" />
<PackageReference Include="Microsoft.Extensions.Hosting.Systemd" Version="7.0.0" />
<PackageReference Include="prometheus-net.AspNetCore" Version="7.0.0" />
</ItemGroup>

View File

@@ -2,6 +2,7 @@
using MareSynchronosShared.Metrics;
using MareSynchronosShared.Services;
using MareSynchronosStaticFilesServer.Utils;
using Microsoft.AspNetCore.SignalR;
using System.Collections.Concurrent;
using System.Timers;
@@ -13,46 +14,45 @@ public class RequestQueueService : IHostedService
private readonly ConcurrentQueue<UserRequest> _queue = new();
private readonly MareMetrics _metrics;
private readonly ILogger<RequestQueueService> _logger;
private readonly IHubContext<MareSynchronosServer.Hubs.MareHub> _hubContext;
private readonly int _queueExpirationSeconds;
private SemaphoreSlim _queueSemaphore = new(1);
private SemaphoreSlim _queueProcessingSemaphore = new(1);
private readonly SemaphoreSlim _queueSemaphore = new(1);
private readonly SemaphoreSlim _queueProcessingSemaphore = new(1);
private System.Timers.Timer _queueTimer;
private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new();
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger)
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger, IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext)
{
_userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)];
_queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5);
_metrics = metrics;
_logger = logger;
_hubContext = hubContext;
}
public async Task<QueueStatus> EnqueueUser(UserRequest request)
public async Task EnqueueUser(UserRequest request)
{
_logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, request.FileId);
if (_queueProcessingSemaphore.CurrentCount == 0)
{
_queue.Enqueue(request);
return QueueStatus.Waiting;
}
try
{
await _queueSemaphore.WaitAsync().ConfigureAwait(false);
QueueStatus status = QueueStatus.Waiting;
var idx = Array.FindIndex(_userQueueRequests, r => r == null);
if (idx == -1)
{
_queue.Enqueue(request);
status = QueueStatus.Waiting;
}
else
{
DequeueIntoSlot(request, idx);
status = QueueStatus.Ready;
await DequeueIntoSlotAsync(request, idx).ConfigureAwait(false);
}
return status;
return;
}
catch (Exception ex)
{
@@ -66,22 +66,39 @@ public class RequestQueueService : IHostedService
throw new Exception("Error during EnqueueUser");
}
public void RemoveFromQueue(Guid requestId, string user)
{
if (!_queue.Any(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal)))
{
var activeSlot = _userQueueRequests.FirstOrDefault(r => r != null && string.Equals(r.UserRequest.User, user, StringComparison.Ordinal) && r.UserRequest.RequestId == requestId);
if (activeSlot != null)
{
var idx = Array.IndexOf(_userQueueRequests, activeSlot);
if (idx >= 0)
{
_userQueueRequests[idx] = null;
return;
}
}
}
_queueRemoval[requestId] = user;
}
public bool StillEnqueued(Guid request, string user)
{
return _queue.FirstOrDefault(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal)) != null;
return _queue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
}
public bool IsActiveProcessing(Guid request, string user, out UserRequest userRequest)
{
var userQueueRequest = _userQueueRequests.Where(u => u != null)
.FirstOrDefault(f => f.UserRequest.RequestId == request && string.Equals(f.UserRequest.User, user, StringComparison.Ordinal));
var userQueueRequest = _userQueueRequests.FirstOrDefault(u => u != null && u.UserRequest.RequestId == request && string.Equals(u.UserRequest.User, user, StringComparison.Ordinal));
userRequest = userQueueRequest?.UserRequest ?? null;
return userQueueRequest != null && userRequest != null && userQueueRequest.ExpirationDate > DateTime.UtcNow;
}
public void FinishRequest(Guid request)
{
var req = _userQueueRequests.Where(f => f != null).First(f => f.UserRequest.RequestId == request);
var req = _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request);
var idx = Array.IndexOf(_userQueueRequests, req);
_logger.LogDebug("Finishing Request {guid}, clearing slot {idx}", request, idx);
_userQueueRequests[idx] = null;
@@ -90,7 +107,7 @@ public class RequestQueueService : IHostedService
public void ActivateRequest(Guid request)
{
_logger.LogDebug("Activating request {guid}", request);
_userQueueRequests.Where(f => f != null).First(f => f.UserRequest.RequestId == request).IsActive = true;
_userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request).IsActive = true;
}
private async void ProcessQueue(object src, ElapsedEventArgs e)
@@ -103,19 +120,35 @@ public class RequestQueueService : IHostedService
{
Parallel.For(0, _userQueueRequests.Length, new ParallelOptions()
{
MaxDegreeOfParallelism = 10
MaxDegreeOfParallelism = 10,
},
(i) =>
async (i) =>
{
if (!_queue.Any()) return;
if (_userQueueRequests[i] != null && !_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow) _userQueueRequests[i] = null;
if (_userQueueRequests[i] == null)
{
bool enqueued = false;
while (!enqueued)
{
if (_queue.TryDequeue(out var request))
{
DequeueIntoSlot(request, i);
if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal))
{
_logger.LogDebug("Request cancelled: {requestId} by {user}", request.RequestId, user);
_queueRemoval.Remove(request.RequestId, out _);
continue;
}
await DequeueIntoSlotAsync(request, i).ConfigureAwait(false);
enqueued = true;
}
else
{
enqueued = true;
}
}
}
});
@@ -133,10 +166,11 @@ public class RequestQueueService : IHostedService
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count);
}
private void DequeueIntoSlot(UserRequest userRequest, int slot)
private async Task DequeueIntoSlotAsync(UserRequest userRequest, int slot)
{
_logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, userRequest.FileId);
_userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds));
await _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false);
}
public Task StartAsync(CancellationToken cancellationToken)

View File

@@ -1,5 +1,6 @@
using Grpc.Net.Client.Configuration;
using Grpc.Net.ClientFactory;
using MareSynchronos.API;
using MareSynchronosShared.Data;
using MareSynchronosShared.Metrics;
using MareSynchronosShared.Protos;
@@ -10,10 +11,12 @@ using MareSynchronosStaticFilesServer.Utils;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Microsoft.IdentityModel.Tokens;
using Prometheus;
using StackExchange.Redis;
using System.Text;
namespace MareSynchronosStaticFilesServer;
@@ -162,6 +165,19 @@ public class Startup
services.AddHostedService(p => (MareConfigurationServiceClient<MareConfigurationAuthBase>)p.GetService<IConfigurationService<MareConfigurationAuthBase>>());
services.AddSingleton<IUserIdProvider, IdBasedUserIdProvider>();
var signalRServiceBuilder = services.AddSignalR(hubOptions =>
{
hubOptions.MaximumReceiveMessageSize = long.MaxValue;
hubOptions.EnableDetailedErrors = true;
hubOptions.MaximumParallelInvocationsPerClient = 10;
hubOptions.StreamBufferCapacity = 200;
});
// configure redis for SignalR
var redisConnection = mareConfig.GetValue(nameof(ServerConfiguration.RedisConnectionString), string.Empty);
signalRServiceBuilder.AddStackExchangeRedis(redisConnection, options => { });
services.AddHealthChecks();
services.AddControllers();
}
@@ -188,6 +204,7 @@ public class Startup
{
e.MapGrpcService<GrpcFileService>();
}
e.MapHub<MareSynchronosServer.Hubs.MareHub>("/dummyhub");
e.MapControllers();
e.MapHealthChecks("/health").WithMetadata(new AllowAnonymousAttribute());
});

View File

@@ -34,9 +34,17 @@ public class RequestFileStreamResult : FileStreamResult
}
public override void ExecuteResult(ActionContext context)
{
try
{
base.ExecuteResult(context);
}
catch
{
throw;
}
finally
{
_releaseCts.Cancel();
if (!_releasedSlot)
@@ -44,11 +52,20 @@ public class RequestFileStreamResult : FileStreamResult
_mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads);
}
}
public override async Task ExecuteResultAsync(ActionContext context)
{
try
{
await base.ExecuteResultAsync(context).ConfigureAwait(false);
}
catch
{
throw;
}
finally
{
_releaseCts.Cancel();
if (!_releasedSlot)
@@ -56,4 +73,5 @@ public class RequestFileStreamResult : FileStreamResult
_mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads);
}
}
}