[Draft] Update 0.8 (#25)
* get rid of file handling through grpc and signalr * fix upload on controller * adapt usersetpairpermissions * send user perms * server-side fixes * rework file upload * adjust log level to debug in docker standalone json * update dependencies --------- Co-authored-by: rootdarkarchon <root.darkarchon@outlook.com>
This commit is contained in:
@@ -60,18 +60,19 @@ public class RequestController : ControllerBase
|
||||
[Route(MareFiles.Request_RequestFile)]
|
||||
public async Task<IActionResult> RequestFile(string file)
|
||||
{
|
||||
Guid g = Guid.NewGuid();
|
||||
|
||||
try
|
||||
{
|
||||
await _parallelRequestSemaphore.WaitAsync(HttpContext.RequestAborted);
|
||||
Guid g = Guid.NewGuid();
|
||||
_cachedFileProvider.DownloadFileWhenRequired(file);
|
||||
await _requestQueue.EnqueueUser(new(g, MareUser, file));
|
||||
return Ok(g);
|
||||
}
|
||||
catch (OperationCanceledException) { return BadRequest(); }
|
||||
finally
|
||||
{
|
||||
_parallelRequestSemaphore.Release();
|
||||
await _requestQueue.EnqueueUser(new(g, MareUser, file));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,18 +1,146 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using LZ4;
|
||||
using MareSynchronos.API.Dto.Files;
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronos.API.SignalR;
|
||||
using MareSynchronosServer.Hubs;
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Models;
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosShared.Utils;
|
||||
using MareSynchronosStaticFilesServer.Services;
|
||||
using MareSynchronosStaticFilesServer.Utils;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text.Json;
|
||||
using System.Text.RegularExpressions;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Controllers;
|
||||
|
||||
[Route(MareFiles.ServerFiles)]
|
||||
public class ServerFilesController : ControllerBase
|
||||
{
|
||||
private static readonly ConcurrentDictionary<string, SemaphoreSlim> _fileUploadLocks = new(StringComparer.Ordinal);
|
||||
private readonly string _basePath;
|
||||
private readonly CachedFileProvider _cachedFileProvider;
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
private readonly IHubContext<MareHub> _hubContext;
|
||||
private readonly MareDbContext _mareDbContext;
|
||||
private readonly MareMetrics _metricsClient;
|
||||
|
||||
public ServerFilesController(ILogger<ServerFilesController> logger, CachedFileProvider cachedFileProvider) : base(logger)
|
||||
public ServerFilesController(ILogger<ServerFilesController> logger, CachedFileProvider cachedFileProvider,
|
||||
IConfigurationService<StaticFilesServerConfiguration> configuration,
|
||||
IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext,
|
||||
MareDbContext mareDbContext, MareMetrics metricsClient) : base(logger)
|
||||
{
|
||||
_basePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
_cachedFileProvider = cachedFileProvider;
|
||||
_configuration = configuration;
|
||||
_hubContext = hubContext;
|
||||
_mareDbContext = mareDbContext;
|
||||
_metricsClient = metricsClient;
|
||||
}
|
||||
|
||||
[HttpPost(MareFiles.ServerFiles_DeleteAll)]
|
||||
public async Task<IActionResult> FilesDeleteAll()
|
||||
{
|
||||
var ownFiles = await _mareDbContext.Files.Where(f => f.Uploaded && f.Uploader.UID == MareUser).ToListAsync().ConfigureAwait(false);
|
||||
|
||||
foreach (var dbFile in ownFiles)
|
||||
{
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_basePath, dbFile.Hash);
|
||||
if (fi != null)
|
||||
{
|
||||
_metricsClient.DecGauge(MetricsAPI.GaugeFilesTotal, fi == null ? 0 : 1);
|
||||
_metricsClient.DecGauge(MetricsAPI.GaugeFilesTotalSize, fi?.Length ?? 0);
|
||||
|
||||
fi?.Delete();
|
||||
}
|
||||
}
|
||||
|
||||
_mareDbContext.Files.RemoveRange(ownFiles);
|
||||
await _mareDbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
|
||||
return Ok();
|
||||
}
|
||||
|
||||
[HttpGet(MareFiles.ServerFiles_GetSizes)]
|
||||
public async Task<IActionResult> FilesGetSizes([FromBody] List<string> hashes)
|
||||
{
|
||||
var allFiles = await _mareDbContext.Files.Where(f => hashes.Contains(f.Hash)).ToListAsync().ConfigureAwait(false);
|
||||
var forbiddenFiles = await _mareDbContext.ForbiddenUploadEntries.
|
||||
Where(f => hashes.Contains(f.Hash)).ToListAsync().ConfigureAwait(false);
|
||||
List<DownloadFileDto> response = new();
|
||||
|
||||
var cacheFile = await _mareDbContext.Files.AsNoTracking().Where(f => hashes.Contains(f.Hash)).AsNoTracking().Select(k => new { k.Hash, k.Size }).AsNoTracking().ToListAsync().ConfigureAwait(false);
|
||||
|
||||
var shardConfig = new List<CdnShardConfiguration>(_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.CdnShardConfiguration), new List<CdnShardConfiguration>()));
|
||||
|
||||
foreach (var file in cacheFile)
|
||||
{
|
||||
var forbiddenFile = forbiddenFiles.SingleOrDefault(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
var matchedShardConfig = shardConfig.OrderBy(g => Guid.NewGuid()).FirstOrDefault(f => new Regex(f.FileMatch).IsMatch(file.Hash));
|
||||
var baseUrl = matchedShardConfig?.CdnFullUrl ?? _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
|
||||
|
||||
response.Add(new DownloadFileDto
|
||||
{
|
||||
FileExists = file.Size > 0,
|
||||
ForbiddenBy = forbiddenFile?.ForbiddenBy ?? string.Empty,
|
||||
IsForbidden = forbiddenFile != null,
|
||||
Hash = file.Hash,
|
||||
Size = file.Size,
|
||||
Url = baseUrl.ToString(),
|
||||
});
|
||||
}
|
||||
|
||||
return Ok(JsonSerializer.Serialize(response));
|
||||
}
|
||||
|
||||
[HttpPost(MareFiles.ServerFiles_FilesSend)]
|
||||
public async Task<IActionResult> FilesSend([FromBody] FilesSendDto filesSendDto)
|
||||
{
|
||||
var userSentHashes = new HashSet<string>(filesSendDto.FileHashes.Distinct(StringComparer.Ordinal).Select(s => string.Concat(s.Where(c => char.IsLetterOrDigit(c)))), StringComparer.Ordinal);
|
||||
var notCoveredFiles = new Dictionary<string, UploadFileDto>(StringComparer.Ordinal);
|
||||
var forbiddenFiles = await _mareDbContext.ForbiddenUploadEntries.AsNoTracking().Where(f => userSentHashes.Contains(f.Hash)).AsNoTracking().ToDictionaryAsync(f => f.Hash, f => f).ConfigureAwait(false);
|
||||
var existingFiles = await _mareDbContext.Files.AsNoTracking().Where(f => userSentHashes.Contains(f.Hash)).AsNoTracking().ToDictionaryAsync(f => f.Hash, f => f).ConfigureAwait(false);
|
||||
|
||||
List<FileCache> fileCachesToUpload = new();
|
||||
foreach (var hash in userSentHashes)
|
||||
{
|
||||
// Skip empty file hashes, duplicate file hashes, forbidden file hashes and existing file hashes
|
||||
if (string.IsNullOrEmpty(hash)) { continue; }
|
||||
if (notCoveredFiles.ContainsKey(hash)) { continue; }
|
||||
if (forbiddenFiles.ContainsKey(hash))
|
||||
{
|
||||
notCoveredFiles[hash] = new UploadFileDto()
|
||||
{
|
||||
ForbiddenBy = forbiddenFiles[hash].ForbiddenBy,
|
||||
Hash = hash,
|
||||
IsForbidden = true,
|
||||
};
|
||||
|
||||
continue;
|
||||
}
|
||||
if (existingFiles.TryGetValue(hash, out var file) && file.Uploaded) { continue; }
|
||||
|
||||
notCoveredFiles[hash] = new UploadFileDto()
|
||||
{
|
||||
Hash = hash,
|
||||
};
|
||||
}
|
||||
|
||||
if (notCoveredFiles.Any())
|
||||
{
|
||||
await _hubContext.Clients.Users(filesSendDto.UIDs).SendAsync(nameof(IMareHub.Client_UserReceiveUploadStatus), new MareSynchronos.API.Dto.User.UserDto(new(MareUser)))
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return Ok(JsonSerializer.Serialize(notCoveredFiles.Values.ToList()));
|
||||
}
|
||||
|
||||
[HttpGet(MareFiles.ServerFiles_Get + "/{fileId}")]
|
||||
@@ -26,4 +154,93 @@ public class ServerFilesController : ControllerBase
|
||||
|
||||
return File(fs, "application/octet-stream");
|
||||
}
|
||||
}
|
||||
|
||||
[HttpPost(MareFiles.ServerFiles_Upload + "/{hash}")]
|
||||
[RequestSizeLimit(200 * 1024 * 1024)]
|
||||
public async Task<IActionResult> UploadFile(string hash, CancellationToken requestAborted)
|
||||
{
|
||||
_logger.LogInformation("{user} uploading file {file}", MareUser, hash);
|
||||
bool initiated = false;
|
||||
hash = hash.ToUpperInvariant();
|
||||
var existingFile = await _mareDbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
|
||||
if (existingFile != null) return Ok();
|
||||
|
||||
if (!_fileUploadLocks.TryGetValue(hash, out var fileLock))
|
||||
{
|
||||
initiated = true;
|
||||
_fileUploadLocks[hash] = fileLock = new SemaphoreSlim(1);
|
||||
await fileLock.WaitAsync(requestAborted).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
if (!initiated)
|
||||
{
|
||||
try
|
||||
{
|
||||
await fileLock.WaitAsync(requestAborted).ConfigureAwait(false);
|
||||
var file = await _mareDbContext.Files.SingleOrDefaultAsync(c => c.Hash == hash).ConfigureAwait(false);
|
||||
if (file != null)
|
||||
{
|
||||
fileLock.Release();
|
||||
return Ok();
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
fileLock.Release();
|
||||
return Ok();
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// copy the request body to memory
|
||||
using var compressedFileStream = new MemoryStream();
|
||||
await Request.Body.CopyToAsync(compressedFileStream, requestAborted).ConfigureAwait(false);
|
||||
|
||||
// decompress and copy the decompressed stream to memory
|
||||
var data = LZ4Codec.Unwrap(compressedFileStream.ToArray());
|
||||
|
||||
// reset streams
|
||||
compressedFileStream.Seek(0, SeekOrigin.Begin);
|
||||
|
||||
// compute hash to verify
|
||||
var hashString = BitConverter.ToString(SHA1.HashData(data))
|
||||
.Replace("-", "", StringComparison.Ordinal).ToUpperInvariant();
|
||||
if (!string.Equals(hashString, hash, StringComparison.Ordinal))
|
||||
throw new InvalidOperationException($"Hash does not match file, computed: {hashString}, expected: {hash}");
|
||||
|
||||
// save file
|
||||
var path = FilePathUtil.GetFilePath(_basePath, hash);
|
||||
using var fileStream = new FileStream(path, FileMode.Create);
|
||||
await compressedFileStream.CopyToAsync(fileStream).ConfigureAwait(false);
|
||||
|
||||
// update on db
|
||||
await _mareDbContext.Files.AddAsync(new FileCache()
|
||||
{
|
||||
Hash = hash,
|
||||
UploadDate = DateTime.UtcNow,
|
||||
UploaderUID = MareUser,
|
||||
Size = compressedFileStream.Length,
|
||||
Uploaded = true
|
||||
}).ConfigureAwait(false);
|
||||
await _mareDbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
|
||||
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotal, 1);
|
||||
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotalSize, compressedFileStream.Length);
|
||||
|
||||
_fileUploadLocks.Remove(hash, out _);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Error during file upload");
|
||||
return BadRequest();
|
||||
}
|
||||
finally
|
||||
{
|
||||
fileLock.Release();
|
||||
_fileUploadLocks.TryRemove(hash, out _);
|
||||
}
|
||||
|
||||
return Ok();
|
||||
}
|
||||
}
|
||||
@@ -18,17 +18,15 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Grpc.AspNetCore" Version="2.51.0" />
|
||||
<PackageReference Include="Grpc.Net.Client" Version="2.51.0" />
|
||||
<PackageReference Include="Meziantou.Analyzer" Version="2.0.14">
|
||||
<PackageReference Include="lz4net" Version="1.0.15.93" />
|
||||
<PackageReference Include="Meziantou.Analyzer" Version="2.0.19">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
</PackageReference>
|
||||
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="7.0.2" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.SignalR.Protocols.MessagePack" Version="7.0.2" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.SignalR.StackExchangeRedis" Version="7.0.2" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.SignalR.Protocols.MessagePack" Version="7.0.4" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.SignalR.StackExchangeRedis" Version="7.0.4" />
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting.Systemd" Version="7.0.0" />
|
||||
<PackageReference Include="prometheus-net.AspNetCore" Version="7.0.0" />
|
||||
<PackageReference Include="prometheus-net.AspNetCore" Version="8.0.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -1,105 +0,0 @@
|
||||
using Grpc.Core;
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Protos;
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosStaticFilesServer.Utils;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
[Authorize(Policy = "Internal")]
|
||||
public class GrpcFileService : FileService.FileServiceBase
|
||||
{
|
||||
private readonly string _basePath;
|
||||
private readonly MareDbContext _mareDbContext;
|
||||
private readonly ILogger<GrpcFileService> _logger;
|
||||
private readonly MareMetrics _metricsClient;
|
||||
|
||||
public GrpcFileService(MareDbContext mareDbContext, IConfigurationService<StaticFilesServerConfiguration> configuration, ILogger<GrpcFileService> logger, MareMetrics metricsClient)
|
||||
{
|
||||
_basePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
_mareDbContext = mareDbContext;
|
||||
_logger = logger;
|
||||
_metricsClient = metricsClient;
|
||||
}
|
||||
|
||||
[Authorize(Policy = "Internal")]
|
||||
public override async Task<Empty> UploadFile(IAsyncStreamReader<UploadFileRequest> requestStream, ServerCallContext context)
|
||||
{
|
||||
_ = await requestStream.MoveNext().ConfigureAwait(false);
|
||||
var uploadMsg = requestStream.Current;
|
||||
var filePath = FilePathUtil.GetFilePath(_basePath, uploadMsg.Hash);
|
||||
using var fileWriter = File.OpenWrite(filePath);
|
||||
var file = await _mareDbContext.Files.SingleOrDefaultAsync(f => f.Hash == uploadMsg.Hash && f.UploaderUID == uploadMsg.Uploader).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (file != null)
|
||||
{
|
||||
await fileWriter.WriteAsync(uploadMsg.FileData.ToArray()).ConfigureAwait(false);
|
||||
|
||||
while (await requestStream.MoveNext().ConfigureAwait(false))
|
||||
{
|
||||
await fileWriter.WriteAsync(requestStream.Current.FileData.ToArray()).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
await fileWriter.FlushAsync().ConfigureAwait(false);
|
||||
fileWriter.Close();
|
||||
|
||||
var fileSize = new FileInfo(filePath).Length;
|
||||
file.Uploaded = true;
|
||||
file.Size = fileSize;
|
||||
|
||||
await _mareDbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
|
||||
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotal, 1);
|
||||
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotalSize, fileSize);
|
||||
|
||||
_logger.LogInformation("User {user} uploaded file {hash}", uploadMsg.Uploader, uploadMsg.Hash);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error during UploadFile");
|
||||
var fileNew = await _mareDbContext.Files.SingleOrDefaultAsync(f => f.Hash == uploadMsg.Hash && f.UploaderUID == uploadMsg.Uploader).ConfigureAwait(false);
|
||||
if (fileNew != null)
|
||||
{
|
||||
_mareDbContext.Files.Remove(fileNew);
|
||||
}
|
||||
|
||||
await _mareDbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return new Empty();
|
||||
}
|
||||
|
||||
[Authorize(Policy = "Internal")]
|
||||
public override async Task<Empty> DeleteFiles(DeleteFilesRequest request, ServerCallContext context)
|
||||
{
|
||||
foreach (var hash in request.Hash)
|
||||
{
|
||||
try
|
||||
{
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_basePath, hash);
|
||||
var file = await _mareDbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash).ConfigureAwait(false);
|
||||
if (file != null && fi != null)
|
||||
{
|
||||
_mareDbContext.Files.Remove(file);
|
||||
await _mareDbContext.SaveChangesAsync().ConfigureAwait(false);
|
||||
|
||||
_metricsClient.DecGauge(MetricsAPI.GaugeFilesTotal, fi == null ? 0 : 1);
|
||||
_metricsClient.DecGauge(MetricsAPI.GaugeFilesTotalSize, fi?.Length ?? 0);
|
||||
|
||||
fi?.Delete();
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Could not delete file for hash {hash}", hash);
|
||||
}
|
||||
}
|
||||
|
||||
return new Empty();
|
||||
}
|
||||
}
|
||||
@@ -10,17 +10,17 @@ namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public class RequestQueueService : IHostedService
|
||||
{
|
||||
private readonly UserQueueEntry[] _userQueueRequests;
|
||||
private readonly ConcurrentQueue<UserRequest> _queue = new();
|
||||
private readonly MareMetrics _metrics;
|
||||
private readonly ILogger<RequestQueueService> _logger;
|
||||
private readonly IHubContext<MareSynchronosServer.Hubs.MareHub> _hubContext;
|
||||
private readonly ILogger<RequestQueueService> _logger;
|
||||
private readonly MareMetrics _metrics;
|
||||
private readonly ConcurrentQueue<UserRequest> _queue = new();
|
||||
private readonly int _queueExpirationSeconds;
|
||||
private readonly SemaphoreSlim _queueSemaphore = new(1);
|
||||
private readonly SemaphoreSlim _queueProcessingSemaphore = new(1);
|
||||
private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new();
|
||||
private readonly SemaphoreSlim _queueSemaphore = new(1);
|
||||
private readonly UserQueueEntry[] _userQueueRequests;
|
||||
private int _queueLimitForReset;
|
||||
private System.Timers.Timer _queueTimer;
|
||||
private readonly ConcurrentDictionary<Guid, string> _queueRemoval = new();
|
||||
|
||||
public RequestQueueService(MareMetrics metrics, IConfigurationService<StaticFilesServerConfiguration> configurationService, ILogger<RequestQueueService> logger, IHubContext<MareSynchronosServer.Hubs.MareHub> hubContext)
|
||||
{
|
||||
@@ -32,6 +32,12 @@ public class RequestQueueService : IHostedService
|
||||
_hubContext = hubContext;
|
||||
}
|
||||
|
||||
public void ActivateRequest(Guid request)
|
||||
{
|
||||
_logger.LogDebug("Activating request {guid}", request);
|
||||
_userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request).IsActive = true;
|
||||
}
|
||||
|
||||
public async Task EnqueueUser(UserRequest request)
|
||||
{
|
||||
_logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, request.FileId);
|
||||
@@ -39,20 +45,13 @@ public class RequestQueueService : IHostedService
|
||||
if (_queueProcessingSemaphore.CurrentCount == 0)
|
||||
{
|
||||
_queue.Enqueue(request);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await _queueSemaphore.WaitAsync().ConfigureAwait(false);
|
||||
var idx = Array.FindIndex(_userQueueRequests, r => r == null);
|
||||
if (idx == -1)
|
||||
{
|
||||
_queue.Enqueue(request);
|
||||
}
|
||||
else
|
||||
{
|
||||
await DequeueIntoSlotAsync(request, idx).ConfigureAwait(false);
|
||||
}
|
||||
_queue.Enqueue(request);
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -68,6 +67,21 @@ public class RequestQueueService : IHostedService
|
||||
throw new Exception("Error during EnqueueUser");
|
||||
}
|
||||
|
||||
public void FinishRequest(Guid request)
|
||||
{
|
||||
var req = _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request);
|
||||
var idx = Array.IndexOf(_userQueueRequests, req);
|
||||
_logger.LogDebug("Finishing Request {guid}, clearing slot {idx}", request, idx);
|
||||
_userQueueRequests[idx] = null;
|
||||
}
|
||||
|
||||
public bool IsActiveProcessing(Guid request, string user, out UserRequest userRequest)
|
||||
{
|
||||
var userQueueRequest = _userQueueRequests.FirstOrDefault(u => u != null && u.UserRequest.RequestId == request && string.Equals(u.UserRequest.User, user, StringComparison.Ordinal));
|
||||
userRequest = userQueueRequest?.UserRequest ?? null;
|
||||
return userQueueRequest != null && userRequest != null && userQueueRequest.ExpirationDate > DateTime.UtcNow;
|
||||
}
|
||||
|
||||
public void RemoveFromQueue(Guid requestId, string user)
|
||||
{
|
||||
if (!_queue.Any(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal)))
|
||||
@@ -87,30 +101,31 @@ public class RequestQueueService : IHostedService
|
||||
_queueRemoval[requestId] = user;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_queueTimer = new System.Timers.Timer(250);
|
||||
_queueTimer.Elapsed += ProcessQueue;
|
||||
_queueTimer.AutoReset = true;
|
||||
_queueTimer.Start();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public bool StillEnqueued(Guid request, string user)
|
||||
{
|
||||
return _queue.Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal));
|
||||
}
|
||||
|
||||
public bool IsActiveProcessing(Guid request, string user, out UserRequest userRequest)
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var userQueueRequest = _userQueueRequests.FirstOrDefault(u => u != null && u.UserRequest.RequestId == request && string.Equals(u.UserRequest.User, user, StringComparison.Ordinal));
|
||||
userRequest = userQueueRequest?.UserRequest ?? null;
|
||||
return userQueueRequest != null && userRequest != null && userQueueRequest.ExpirationDate > DateTime.UtcNow;
|
||||
_queueTimer.Stop();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void FinishRequest(Guid request)
|
||||
private async Task DequeueIntoSlotAsync(UserRequest userRequest, int slot)
|
||||
{
|
||||
var req = _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request);
|
||||
var idx = Array.IndexOf(_userQueueRequests, req);
|
||||
_logger.LogDebug("Finishing Request {guid}, clearing slot {idx}", request, idx);
|
||||
_userQueueRequests[idx] = null;
|
||||
}
|
||||
|
||||
public void ActivateRequest(Guid request)
|
||||
{
|
||||
_logger.LogDebug("Activating request {guid}", request);
|
||||
_userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request).IsActive = true;
|
||||
_logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, userRequest.FileId);
|
||||
_userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds));
|
||||
await _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async void ProcessQueue(object src, ElapsedEventArgs e)
|
||||
@@ -161,7 +176,6 @@ public class RequestQueueService : IHostedService
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -174,26 +188,4 @@ public class RequestQueueService : IHostedService
|
||||
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count);
|
||||
}
|
||||
|
||||
private async Task DequeueIntoSlotAsync(UserRequest userRequest, int slot)
|
||||
{
|
||||
_logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, userRequest.FileId);
|
||||
_userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds));
|
||||
await _hubContext.Clients.User(userRequest.User).SendAsync(nameof(IMareHub.Client_DownloadReady), userRequest.RequestId).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_queueTimer = new System.Timers.Timer(250);
|
||||
_queueTimer.Elapsed += ProcessQueue;
|
||||
_queueTimer.AutoReset = true;
|
||||
_queueTimer.Start();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_queueTimer.Stop();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
using Grpc.Net.Client.Configuration;
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Services;
|
||||
@@ -16,6 +15,10 @@ using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.IdentityModel.Tokens;
|
||||
using Prometheus;
|
||||
using StackExchange.Redis.Extensions.Core.Configuration;
|
||||
using StackExchange.Redis.Extensions.System.Text.Json;
|
||||
using StackExchange.Redis;
|
||||
using System.Net;
|
||||
using System.Text;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer;
|
||||
@@ -77,12 +80,6 @@ public class Startup
|
||||
options.EnableThreadSafetyChecks(false);
|
||||
}, mareConfig.GetValue(nameof(MareConfigurationBase.DbContextPoolSize), 1024));
|
||||
|
||||
var noRetryConfig = new MethodConfig
|
||||
{
|
||||
Names = { MethodName.Default },
|
||||
RetryPolicy = null,
|
||||
};
|
||||
|
||||
services.AddOptions<JwtBearerOptions>(JwtBearerDefaults.AuthenticationScheme)
|
||||
.Configure<IConfigurationService<MareConfigurationAuthBase>>((o, s) =>
|
||||
{
|
||||
@@ -111,11 +108,6 @@ public class Startup
|
||||
|
||||
if (_isMain)
|
||||
{
|
||||
services.AddGrpc(o =>
|
||||
{
|
||||
o.MaxReceiveMessageSize = null;
|
||||
});
|
||||
|
||||
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, MareConfigurationServiceServer<StaticFilesServerConfiguration>>();
|
||||
}
|
||||
else
|
||||
@@ -174,6 +166,39 @@ public class Startup
|
||||
var redisConnection = mareConfig.GetValue(nameof(ServerConfiguration.RedisConnectionString), string.Empty);
|
||||
signalRServiceBuilder.AddStackExchangeRedis(redisConnection, options => { });
|
||||
|
||||
var options = ConfigurationOptions.Parse(redisConnection);
|
||||
|
||||
var endpoint = options.EndPoints[0];
|
||||
string address = "";
|
||||
int port = 0;
|
||||
if (endpoint is DnsEndPoint dnsEndPoint) { address = dnsEndPoint.Host; port = dnsEndPoint.Port; }
|
||||
if (endpoint is IPEndPoint ipEndPoint) { address = ipEndPoint.Address.ToString(); port = ipEndPoint.Port; }
|
||||
var redisConfiguration = new RedisConfiguration()
|
||||
{
|
||||
AbortOnConnectFail = true,
|
||||
KeyPrefix = "",
|
||||
Hosts = new RedisHost[]
|
||||
{
|
||||
new RedisHost(){ Host = address, Port = port },
|
||||
},
|
||||
AllowAdmin = true,
|
||||
ConnectTimeout = options.ConnectTimeout,
|
||||
Database = 0,
|
||||
Ssl = false,
|
||||
Password = options.Password,
|
||||
ServerEnumerationStrategy = new ServerEnumerationStrategy()
|
||||
{
|
||||
Mode = ServerEnumerationStrategy.ModeOptions.All,
|
||||
TargetRole = ServerEnumerationStrategy.TargetRoleOptions.Any,
|
||||
UnreachableServerAction = ServerEnumerationStrategy.UnreachableServerActionOptions.Throw,
|
||||
},
|
||||
MaxValueLength = 1024,
|
||||
PoolSize = mareConfig.GetValue(nameof(ServerConfiguration.RedisPool), 50),
|
||||
SyncTimeout = options.SyncTimeout,
|
||||
};
|
||||
|
||||
services.AddStackExchangeRedisExtensions<SystemTextJsonSerializer>(redisConfiguration);
|
||||
|
||||
services.AddHealthChecks();
|
||||
}
|
||||
|
||||
@@ -195,10 +220,6 @@ public class Startup
|
||||
|
||||
app.UseEndpoints(e =>
|
||||
{
|
||||
if (_isMain)
|
||||
{
|
||||
e.MapGrpcService<GrpcFileService>();
|
||||
}
|
||||
e.MapHub<MareSynchronosServer.Hubs.MareHub>("/dummyhub");
|
||||
e.MapControllers();
|
||||
e.MapHealthChecks("/health").WithMetadata(new AllowAnonymousAttribute());
|
||||
|
||||
Reference in New Issue
Block a user