add FileDbService to handle file requests
This commit is contained in:
@@ -46,30 +46,29 @@ public partial class MareHub
|
||||
{
|
||||
_logger.LogCallInfo(MareHubLogger.Args(hashes.Count.ToString()));
|
||||
|
||||
var allFiles = await _dbContext.Files.Where(f => hashes.Contains(f.Hash)).ToListAsync().ConfigureAwait(false);
|
||||
var forbiddenFiles = await _dbContext.ForbiddenUploadEntries.
|
||||
Where(f => hashes.Contains(f.Hash)).ToListAsync().ConfigureAwait(false);
|
||||
hashes = hashes.Distinct(StringComparer.Ordinal).ToList();
|
||||
var allFiles = hashes.Select(_fileDbService.GetFileCache).Select(t => t.Result).Where(t => t != null).ToList();
|
||||
var forbiddenFiles = await _dbContext.ForbiddenUploadEntries.Where(f => hashes.Contains(f.Hash)).ToListAsync().ConfigureAwait(false);
|
||||
List<DownloadFileDto> response = new();
|
||||
|
||||
var cacheFile = await _dbContext.Files.AsNoTracking().Where(f => hashes.Contains(f.Hash)).AsNoTracking().Select(k => new { k.Hash, k.Size }).AsNoTracking().ToListAsync().ConfigureAwait(false);
|
||||
|
||||
var shardConfig = new List<CdnShardConfiguration>(_configurationService.GetValueOrDefault(nameof(ServerConfiguration.CdnShardConfiguration), new List<CdnShardConfiguration>()));
|
||||
|
||||
foreach (var file in cacheFile)
|
||||
foreach (var file in hashes)
|
||||
{
|
||||
var forbiddenFile = forbiddenFiles.SingleOrDefault(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase));
|
||||
var cacheFile = allFiles.SingleOrDefault(f => string.Equals(f.Hash, file, StringComparison.Ordinal));
|
||||
var forbiddenFile = forbiddenFiles.SingleOrDefault(f => string.Equals(f.Hash, file, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
var matchedShardConfig = shardConfig.OrderBy(g => Guid.NewGuid()).FirstOrDefault(f => new Regex(f.FileMatch).IsMatch(file.Hash));
|
||||
var matchedShardConfig = shardConfig.OrderBy(g => Guid.NewGuid()).FirstOrDefault(f => new Regex(f.FileMatch).IsMatch(file));
|
||||
var baseUrl = matchedShardConfig?.CdnFullUrl ?? _mainCdnFullUrl;
|
||||
|
||||
response.Add(new DownloadFileDto
|
||||
{
|
||||
FileExists = file.Size > 0,
|
||||
FileExists = cacheFile.Size > 0,
|
||||
ForbiddenBy = forbiddenFile?.ForbiddenBy ?? string.Empty,
|
||||
IsForbidden = forbiddenFile != null,
|
||||
Hash = file.Hash,
|
||||
Size = file.Size,
|
||||
Url = new Uri(baseUrl, file.Hash.ToUpperInvariant()).ToString()
|
||||
Hash = cacheFile.Hash,
|
||||
Size = cacheFile.Size,
|
||||
Url = new Uri(baseUrl, cacheFile.Hash.ToUpperInvariant()).ToString()
|
||||
});
|
||||
}
|
||||
|
||||
@@ -91,7 +90,7 @@ public partial class MareHub
|
||||
_logger.LogCallInfo(MareHubLogger.Args(userSentHashes.Count.ToString()));
|
||||
var notCoveredFiles = new Dictionary<string, UploadFileDto>(StringComparer.Ordinal);
|
||||
var forbiddenFiles = await _dbContext.ForbiddenUploadEntries.AsNoTracking().Where(f => userSentHashes.Contains(f.Hash)).AsNoTracking().ToDictionaryAsync(f => f.Hash, f => f).ConfigureAwait(false);
|
||||
var existingFiles = await _dbContext.Files.AsNoTracking().Where(f => userSentHashes.Contains(f.Hash)).AsNoTracking().ToDictionaryAsync(f => f.Hash, f => f).ConfigureAwait(false);
|
||||
var existingFiles = fileListHashes.Select(_fileDbService.GetFileCache).Select(t => t.Result).Where(t => t != null).ToDictionary(k => k.Hash, k => k, StringComparer.Ordinal);
|
||||
var uploader = await _dbContext.Users.SingleAsync(u => u.UID == UserUID).ConfigureAwait(false);
|
||||
|
||||
List<FileCache> fileCachesToUpload = new();
|
||||
|
||||
@@ -20,6 +20,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
private readonly SystemInfoService _systemInfoService;
|
||||
private readonly IHttpContextAccessor _contextAccessor;
|
||||
private readonly IClientIdentificationService _clientIdentService;
|
||||
private readonly FileDbService _fileDbService;
|
||||
private readonly MareHubLogger _logger;
|
||||
private readonly MareDbContext _dbContext;
|
||||
private readonly Uri _mainCdnFullUrl;
|
||||
@@ -32,7 +33,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
public MareHub(MareMetrics mareMetrics, FileService.FileServiceClient fileServiceClient,
|
||||
MareDbContext mareDbContext, ILogger<MareHub> logger, SystemInfoService systemInfoService,
|
||||
IConfigurationService<ServerConfiguration> configuration, IHttpContextAccessor contextAccessor,
|
||||
IClientIdentificationService clientIdentService)
|
||||
IClientIdentificationService clientIdentService, FileDbService fileDbService)
|
||||
{
|
||||
_mareMetrics = mareMetrics;
|
||||
_fileServiceClient = fileServiceClient;
|
||||
@@ -45,6 +46,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
_maxGroupUserCount = configuration.GetValueOrDefault(nameof(ServerConfiguration.MaxGroupUserCount), 100);
|
||||
_contextAccessor = contextAccessor;
|
||||
_clientIdentService = clientIdentService;
|
||||
_fileDbService = fileDbService;
|
||||
_logger = new MareHubLogger(this, logger);
|
||||
_dbContext = mareDbContext;
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ public class IdentityHandler
|
||||
|
||||
internal void EnqueueIdentChange(IdentChange identchange)
|
||||
{
|
||||
_logger.LogInformation("Enqueued " + identchange.UidWithIdent.Uid.Uid + ":" + identchange.IsOnline + " from " + identchange.UidWithIdent.Ident.ServerId);
|
||||
_logger.LogDebug("Enqueued " + identchange.UidWithIdent.Uid.Uid + ":" + identchange.IsOnline + " from " + identchange.UidWithIdent.Ident.ServerId);
|
||||
|
||||
foreach (var k in _identChanges.Keys)
|
||||
{
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosShared.Models;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace MareSynchronosServer.Services;
|
||||
|
||||
public class FileDbService : IHostedService
|
||||
{
|
||||
private readonly IServiceProvider _serviceProvider;
|
||||
private readonly ILogger<FileDbService> _logger;
|
||||
private Dictionary<string, FileCache> _fileCaches = new(StringComparer.Ordinal);
|
||||
private readonly SemaphoreSlim _semaphore = new(5);
|
||||
private readonly CancellationTokenSource _shutdownCancellationToken = new();
|
||||
|
||||
public FileDbService(IServiceProvider serviceProvider, ILogger<FileDbService> logger)
|
||||
{
|
||||
_serviceProvider = serviceProvider;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task<FileCache?> GetFileCache(string hash)
|
||||
{
|
||||
await _semaphore.WaitAsync().ConfigureAwait(false);
|
||||
if (!_fileCaches.TryGetValue(hash, out var cache))
|
||||
{
|
||||
using var db = _serviceProvider.GetService<MareDbContext>();
|
||||
cache = db.Files.AsNoTracking().SingleOrDefault(f => f.Hash == hash && f.Uploaded);
|
||||
if (cache != null) _fileCaches[hash] = cache;
|
||||
}
|
||||
_semaphore.Release();
|
||||
|
||||
return cache;
|
||||
}
|
||||
|
||||
private async Task UpdateDatabasePeriodically(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var now = DateTime.Now;
|
||||
TimeOnly currentTime = new(now.Hour, now.Minute, now.Second);
|
||||
TimeOnly futureTime = new(now.Hour, now.Minute - now.Minute % 10 + 5, 0);
|
||||
var span = futureTime.AddMinutes(10) - currentTime;
|
||||
|
||||
await Task.Delay(span, ct).ConfigureAwait(false);
|
||||
|
||||
await UpdateDatabase().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UpdateDatabase()
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
using var db = scope.ServiceProvider.GetRequiredService<MareDbContext>();
|
||||
_fileCaches = new(await db.Files.AsNoTracking().Where(f => f.Uploaded).AsNoTracking().ToDictionaryAsync(k => k.Hash, k => k).ConfigureAwait(false), StringComparer.Ordinal);
|
||||
_logger.LogInformation("Updated FileCaches, now at {count}", _fileCaches.Count);
|
||||
}
|
||||
|
||||
public async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await UpdateDatabase().ConfigureAwait(false);
|
||||
_ = UpdateDatabasePeriodically(_shutdownCancellationToken.Token);
|
||||
_logger.LogInformation("Started FileDb Service, initially at {count} files from DB", _fileCaches.Count);
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_shutdownCancellationToken.Cancel();
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -78,6 +78,8 @@ public class Startup
|
||||
services.AddSingleton<SystemInfoService>();
|
||||
services.AddSingleton<IUserIdProvider, IdBasedUserIdProvider>();
|
||||
services.AddHostedService(provider => provider.GetService<SystemInfoService>());
|
||||
services.AddSingleton<FileDbService>();
|
||||
services.AddHostedService(p => p.GetService<FileDbService>());
|
||||
// configure services based on main server status
|
||||
ConfigureServicesBasedOnShardType(services, mareConfig, isMainServer);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user