diff --git a/Docker/run/config/sharded/files-shard-1.json b/Docker/run/config/sharded/files-shard-1.json index 0124266..26b6549 100644 --- a/Docker/run/config/sharded/files-shard-1.json +++ b/Docker/run/config/sharded/files-shard-1.json @@ -37,7 +37,8 @@ "MainServerGrpcAddress": "http://mare-server:6005", "DownloadTimeoutSeconds": 30, "DownloadQueueSize": 50, - "DownloadQueueReleaseSeconds": 15 + "DownloadQueueReleaseSeconds": 15, + "RedisConnectionString": "redis,password=secretredispassword" }, "AllowedHosts": "*", "Kestrel": { diff --git a/Docker/run/config/sharded/files-shard-2.json b/Docker/run/config/sharded/files-shard-2.json index 60ce381..a187ac1 100644 --- a/Docker/run/config/sharded/files-shard-2.json +++ b/Docker/run/config/sharded/files-shard-2.json @@ -37,7 +37,8 @@ "MainServerGrpcAddress": "http://mare-server:6005", "DownloadTimeoutSeconds": 30, "DownloadQueueSize": 50, - "DownloadQueueReleaseSeconds": 15 + "DownloadQueueReleaseSeconds": 15, + "RedisConnectionString": "redis,password=secretredispassword" }, "AllowedHosts": "*", "Kestrel": { diff --git a/Docker/run/config/sharded/files-shard-main.json b/Docker/run/config/sharded/files-shard-main.json index 4b857de..222a6d1 100644 --- a/Docker/run/config/sharded/files-shard-main.json +++ b/Docker/run/config/sharded/files-shard-main.json @@ -34,7 +34,8 @@ "UnusedFileRetentionPeriodInDays": 14, "CacheDirectory": "/marecache/", "RemoteCacheSourceUri": "", - "MainServerGrpcAddress": "http://mare-server:6005" + "MainServerGrpcAddress": "http://mare-server:6005", + "RedisConnectionString": "redis,password=secretredispassword" }, "AllowedHosts": "*", "Kestrel": { diff --git a/Docker/run/config/standalone/files-standalone.json b/Docker/run/config/standalone/files-standalone.json index 4b857de..222a6d1 100644 --- a/Docker/run/config/standalone/files-standalone.json +++ b/Docker/run/config/standalone/files-standalone.json @@ -34,7 +34,8 @@ "UnusedFileRetentionPeriodInDays": 14, "CacheDirectory": "/marecache/", "RemoteCacheSourceUri": "", - "MainServerGrpcAddress": "http://mare-server:6005" + "MainServerGrpcAddress": "http://mare-server:6005", + "RedisConnectionString": "redis,password=secretredispassword" }, "AllowedHosts": "*", "Kestrel": { diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.ClientStubs.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.ClientStubs.cs index 14ea0b3..684fe49 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.ClientStubs.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.ClientStubs.cs @@ -63,5 +63,10 @@ namespace MareSynchronosServer.Hubs { throw new PlatformNotSupportedException("Calling clientside method on server not supported"); } + + public Task Client_DownloadReady(Guid requestId) + { + throw new PlatformNotSupportedException("Calling clientside method on server not supported"); + } } } diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs index cd06ae8..1bd6aff 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs @@ -83,15 +83,6 @@ public partial class MareHub : Hub, IMareHub }; } - [Authorize(Policy = "Authenticated")] - public async Task Heartbeat(string characterIdentification) - { - return new ConnectionDto() - { - ServerVersion = IMareHub.ApiVersion, - }; - } - [Authorize(Policy = "Authenticated")] public async Task CheckClientHealth() { diff --git a/MareSynchronosServer/MareSynchronosServer/Startup.cs b/MareSynchronosServer/MareSynchronosServer/Startup.cs index a60a682..bb31454 100644 --- a/MareSynchronosServer/MareSynchronosServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosServer/Startup.cs @@ -10,7 +10,6 @@ using MareSynchronosShared.Protos; using Grpc.Net.Client.Configuration; using MareSynchronosShared.Metrics; using MareSynchronosServer.Services; -using MareSynchronosServer.Utils; using MareSynchronosServer.RequirementHandlers; using MareSynchronosShared.Utils; using MareSynchronosShared.Services; @@ -80,7 +79,6 @@ public class Startup services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); services.AddHostedService(provider => provider.GetService()); // configure services based on main server status ConfigureServicesBasedOnShardType(services, mareConfig, isMainServer); @@ -94,6 +92,8 @@ public class Startup private static void ConfigureSignalR(IServiceCollection services, IConfigurationSection mareConfig) { + services.AddSingleton(); + var signalRServiceBuilder = services.AddSignalR(hubOptions => { hubOptions.MaximumReceiveMessageSize = long.MaxValue; @@ -106,13 +106,7 @@ public class Startup // configure redis for SignalR var redisConnection = mareConfig.GetValue(nameof(ServerConfiguration.RedisConnectionString), string.Empty); - if (!string.IsNullOrEmpty(redisConnection)) - { - signalRServiceBuilder.AddStackExchangeRedis(redisConnection, options => - { - options.Configuration.ChannelPrefix = "MareSynchronos"; - }); - } + signalRServiceBuilder.AddStackExchangeRedis(redisConnection, options => { }); var options = ConfigurationOptions.Parse(redisConnection); diff --git a/MareSynchronosServer/MareSynchronosServer/Utils/IdBasedUserIdProvider.cs b/MareSynchronosServer/MareSynchronosShared/Utils/IdBasedUserIdProvider.cs similarity index 70% rename from MareSynchronosServer/MareSynchronosServer/Utils/IdBasedUserIdProvider.cs rename to MareSynchronosServer/MareSynchronosShared/Utils/IdBasedUserIdProvider.cs index d8d9cd0..45d592f 100644 --- a/MareSynchronosServer/MareSynchronosServer/Utils/IdBasedUserIdProvider.cs +++ b/MareSynchronosServer/MareSynchronosShared/Utils/IdBasedUserIdProvider.cs @@ -1,7 +1,6 @@ -using MareSynchronosShared.Utils; -using Microsoft.AspNetCore.SignalR; +using Microsoft.AspNetCore.SignalR; -namespace MareSynchronosServer.Utils; +namespace MareSynchronosShared.Utils; public class IdBasedUserIdProvider : IUserIdProvider { diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs index 78e1705..245c531 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs @@ -2,7 +2,6 @@ using MareSynchronosShared.Utils; using MareSynchronosStaticFilesServer.Services; using Microsoft.AspNetCore.Mvc; -using System.Text.Json; namespace MareSynchronosStaticFilesServer.Controllers; @@ -20,6 +19,23 @@ public class RequestController : ControllerBase _requestQueue = requestQueue; } + [HttpGet] + [Route(MareFiles.Request_Cancel)] + public async Task CancelQueueRequest(Guid requestId) + { + try + { + await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted); + _requestQueue.RemoveFromQueue(requestId, MareUser); + return Ok(); + } + catch (OperationCanceledException) { return BadRequest(); } + finally + { + _parallelRequestSemaphore.Release(); + } + } + [HttpPost] [Route(MareFiles.Request_Enqueue)] public async Task PreRequestFilesAsync([FromBody] List files) @@ -51,34 +67,8 @@ public class RequestController : ControllerBase await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted); Guid g = Guid.NewGuid(); _cachedFileProvider.DownloadFileWhenRequired(file, Authorization); - var queueStatus = await _requestQueue.EnqueueUser(new(g, MareUser, file)); - return Ok(JsonSerializer.Serialize(new QueueRequestDto(g, queueStatus))); - } - catch (OperationCanceledException) { return BadRequest(); } - finally - { - _parallelRequestSemaphore.Release(); - } - } - - [HttpGet] - [Route(MareFiles.Request_CheckQueue)] - public async Task CheckQueueAsync(Guid requestId) - { - try - { - await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted); - if (_requestQueue.IsActiveProcessing(requestId, MareUser, out _)) - { - return Ok(); - } - - if (_requestQueue.StillEnqueued(requestId, MareUser)) - { - return Conflict(); - } - - return BadRequest(); + await _requestQueue.EnqueueUser(new(g, MareUser, file)); + return Ok(g); } catch (OperationCanceledException) { return BadRequest(); } finally diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/ServerFilesController.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/ServerFilesController.cs index 1e7cf98..5a19f33 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/ServerFilesController.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/ServerFilesController.cs @@ -18,7 +18,7 @@ public class ServerFilesController : ControllerBase [HttpGet(MareFiles.ServerFiles_Get + "/{fileId}")] [Authorize(Policy = "Internal")] - public async Task GetFile(string fileId) + public IActionResult GetFile(string fileId) { _logger.LogInformation($"GetFile:{MareUser}:{fileId}"); diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/DummyHub.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/DummyHub.cs new file mode 100644 index 0000000..afcb665 --- /dev/null +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/DummyHub.cs @@ -0,0 +1,25 @@ +using Microsoft.AspNetCore.SignalR; + +// this is a very hacky way to attach this file server to the main mare hub signalr instance via redis +// signalr publishes the namespace and hubname into the redis backend so this needs to be equal to the original +// but I don't need to reimplement the hub completely as I only exclusively use it for internal connection calling +// from the queue service so I keep the namespace and name of the class the same so it can connect to the same channel +// if anyone finds a better way to do this let me know + +#pragma warning disable IDE0130 // Namespace does not match folder structure +#pragma warning disable MA0048 // File name must match type name +namespace MareSynchronosServer.Hubs; +public class MareHub : Hub +{ + public override Task OnConnectedAsync() + { + throw new NotSupportedException(); + } + + public override Task OnDisconnectedAsync(Exception exception) + { + throw new NotSupportedException(); + } +} +#pragma warning restore IDE0130 // Namespace does not match folder structure +#pragma warning restore MA0048 // File name must match type name \ No newline at end of file diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/MareSynchronosStaticFilesServer.csproj b/MareSynchronosServer/MareSynchronosStaticFilesServer/MareSynchronosStaticFilesServer.csproj index c2fbf31..68c7845 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/MareSynchronosStaticFilesServer.csproj +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/MareSynchronosStaticFilesServer.csproj @@ -25,6 +25,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs index 2fb456e..96defb4 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/RequestQueueService.cs @@ -2,6 +2,7 @@ using MareSynchronosShared.Metrics; using MareSynchronosShared.Services; using MareSynchronosStaticFilesServer.Utils; +using Microsoft.AspNetCore.SignalR; using System.Collections.Concurrent; using System.Timers; @@ -13,46 +14,45 @@ public class RequestQueueService : IHostedService private readonly ConcurrentQueue _queue = new(); private readonly MareMetrics _metrics; private readonly ILogger _logger; + private readonly IHubContext _hubContext; private readonly int _queueExpirationSeconds; - private SemaphoreSlim _queueSemaphore = new(1); - private SemaphoreSlim _queueProcessingSemaphore = new(1); + private readonly SemaphoreSlim _queueSemaphore = new(1); + private readonly SemaphoreSlim _queueProcessingSemaphore = new(1); private System.Timers.Timer _queueTimer; + private readonly ConcurrentDictionary _queueRemoval = new(); - public RequestQueueService(MareMetrics metrics, IConfigurationService configurationService, ILogger logger) + public RequestQueueService(MareMetrics metrics, IConfigurationService configurationService, ILogger logger, IHubContext hubContext) { _userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)]; _queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5); _metrics = metrics; _logger = logger; + _hubContext = hubContext; } - public async Task EnqueueUser(UserRequest request) + public async Task EnqueueUser(UserRequest request) { _logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, request.FileId); if (_queueProcessingSemaphore.CurrentCount == 0) { _queue.Enqueue(request); - return QueueStatus.Waiting; } try { await _queueSemaphore.WaitAsync().ConfigureAwait(false); - QueueStatus status = QueueStatus.Waiting; var idx = Array.FindIndex(_userQueueRequests, r => r == null); if (idx == -1) { _queue.Enqueue(request); - status = QueueStatus.Waiting; } else { - DequeueIntoSlot(request, idx); - status = QueueStatus.Ready; + await DequeueIntoSlotAsync(request, idx).ConfigureAwait(false); } - return status; + return; } catch (Exception ex) { @@ -66,22 +66,39 @@ public class RequestQueueService : IHostedService throw new Exception("Error during EnqueueUser"); } + public void RemoveFromQueue(Guid requestId, string user) + { + if (!_queue.Any(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal))) + { + var activeSlot = _userQueueRequests.FirstOrDefault(r => r != null && string.Equals(r.UserRequest.User, user, StringComparison.Ordinal) && r.UserRequest.RequestId == requestId); + if (activeSlot != null) + { + var idx = Array.IndexOf(_userQueueRequests, activeSlot); + if (idx >= 0) + { + _userQueueRequests[idx] = null; + return; + } + } + } + _queueRemoval[requestId] = user; + } + public bool StillEnqueued(Guid request, string user) { - return _queue.FirstOrDefault(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal)) != null; + return _queue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal)); } public bool IsActiveProcessing(Guid request, string user, out UserRequest userRequest) { - var userQueueRequest = _userQueueRequests.Where(u => u != null) - .FirstOrDefault(f => f.UserRequest.RequestId == request && string.Equals(f.UserRequest.User, user, StringComparison.Ordinal)); + var userQueueRequest = _userQueueRequests.FirstOrDefault(u => u != null && u.UserRequest.RequestId == request && string.Equals(u.UserRequest.User, user, StringComparison.Ordinal)); userRequest = userQueueRequest?.UserRequest ?? null; return userQueueRequest != null && userRequest != null && userQueueRequest.ExpirationDate > DateTime.UtcNow; } public void FinishRequest(Guid request) { - var req = _userQueueRequests.Where(f => f != null).First(f => f.UserRequest.RequestId == request); + var req = _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request); var idx = Array.IndexOf(_userQueueRequests, req); _logger.LogDebug("Finishing Request {guid}, clearing slot {idx}", request, idx); _userQueueRequests[idx] = null; @@ -90,7 +107,7 @@ public class RequestQueueService : IHostedService public void ActivateRequest(Guid request) { _logger.LogDebug("Activating request {guid}", request); - _userQueueRequests.Where(f => f != null).First(f => f.UserRequest.RequestId == request).IsActive = true; + _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request).IsActive = true; } private async void ProcessQueue(object src, ElapsedEventArgs e) @@ -103,9 +120,9 @@ public class RequestQueueService : IHostedService { Parallel.For(0, _userQueueRequests.Length, new ParallelOptions() { - MaxDegreeOfParallelism = 10 + MaxDegreeOfParallelism = 10, }, - (i) => + async (i) => { if (!_queue.Any()) return; @@ -113,9 +130,25 @@ public class RequestQueueService : IHostedService if (_userQueueRequests[i] == null) { - if (_queue.TryDequeue(out var request)) + bool enqueued = false; + while (!enqueued) { - DequeueIntoSlot(request, i); + if (_queue.TryDequeue(out var request)) + { + if (_queueRemoval.TryGetValue(request.RequestId, out string user) && string.Equals(user, request.User, StringComparison.Ordinal)) + { + _logger.LogDebug("Request cancelled: {requestId} by {user}", request.RequestId, user); + _queueRemoval.Remove(request.RequestId, out _); + continue; + } + + await DequeueIntoSlotAsync(request, i).ConfigureAwait(false); + enqueued = true; + } + else + { + enqueued = true; + } } } }); @@ -133,10 +166,11 @@ public class RequestQueueService : IHostedService _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count); } - private void DequeueIntoSlot(UserRequest userRequest, int slot) + private async Task DequeueIntoSlotAsync(UserRequest userRequest, int slot) { _logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, userRequest.FileId); _userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds)); + await _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false); } public Task StartAsync(CancellationToken cancellationToken) diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs index 35a1273..359dd92 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs @@ -1,5 +1,6 @@ using Grpc.Net.Client.Configuration; using Grpc.Net.ClientFactory; +using MareSynchronos.API; using MareSynchronosShared.Data; using MareSynchronosShared.Metrics; using MareSynchronosShared.Protos; @@ -10,10 +11,12 @@ using MareSynchronosStaticFilesServer.Utils; using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using Microsoft.IdentityModel.Tokens; using Prometheus; +using StackExchange.Redis; using System.Text; namespace MareSynchronosStaticFilesServer; @@ -162,6 +165,19 @@ public class Startup services.AddHostedService(p => (MareConfigurationServiceClient)p.GetService>()); + services.AddSingleton(); + var signalRServiceBuilder = services.AddSignalR(hubOptions => + { + hubOptions.MaximumReceiveMessageSize = long.MaxValue; + hubOptions.EnableDetailedErrors = true; + hubOptions.MaximumParallelInvocationsPerClient = 10; + hubOptions.StreamBufferCapacity = 200; + }); + + // configure redis for SignalR + var redisConnection = mareConfig.GetValue(nameof(ServerConfiguration.RedisConnectionString), string.Empty); + signalRServiceBuilder.AddStackExchangeRedis(redisConnection, options => { }); + services.AddHealthChecks(); services.AddControllers(); } @@ -188,6 +204,7 @@ public class Startup { e.MapGrpcService(); } + e.MapHub("/dummyhub"); e.MapControllers(); e.MapHealthChecks("/health").WithMetadata(new AllowAnonymousAttribute()); }); diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResult.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResult.cs index c626f2f..a470263 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResult.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Utils/RequestFileStreamResult.cs @@ -35,25 +35,43 @@ public class RequestFileStreamResult : FileStreamResult public override void ExecuteResult(ActionContext context) { - base.ExecuteResult(context); + try + { + base.ExecuteResult(context); + } + catch + { + throw; + } + finally + { + _releaseCts.Cancel(); - _releaseCts.Cancel(); + if (!_releasedSlot) + _requestQueueService.FinishRequest(_requestId); - if (!_releasedSlot) - _requestQueueService.FinishRequest(_requestId); - - _mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads); + _mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads); + } } public override async Task ExecuteResultAsync(ActionContext context) { - await base.ExecuteResultAsync(context).ConfigureAwait(false); + try + { + await base.ExecuteResultAsync(context).ConfigureAwait(false); + } + catch + { + throw; + } + finally + { + _releaseCts.Cancel(); - _releaseCts.Cancel(); + if (!_releasedSlot) + _requestQueueService.FinishRequest(_requestId); - if (!_releasedSlot) - _requestQueueService.FinishRequest(_requestId); - - _mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads); + _mareMetrics.DecGauge(MetricsAPI.GaugeCurrentDownloads); + } } } \ No newline at end of file