267 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			267 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
using LZ4;
 | 
						|
using MareSynchronos.API.Data;
 | 
						|
using MareSynchronos.API.Dto.Files;
 | 
						|
using MareSynchronos.API.Routes;
 | 
						|
using MareSynchronos.FileCache;
 | 
						|
using MareSynchronos.MareConfiguration;
 | 
						|
using MareSynchronos.Services.Mediator;
 | 
						|
using MareSynchronos.Services.ServerConfiguration;
 | 
						|
using MareSynchronos.UI;
 | 
						|
using MareSynchronos.WebAPI.Files.Models;
 | 
						|
using Microsoft.Extensions.Logging;
 | 
						|
using System.Net.Http.Headers;
 | 
						|
using System.Net.Http.Json;
 | 
						|
 | 
						|
namespace MareSynchronos.WebAPI.Files;
 | 
						|
 | 
						|
public sealed class FileUploadManager : DisposableMediatorSubscriberBase
 | 
						|
{
 | 
						|
    private readonly FileCacheManager _fileDbManager;
 | 
						|
    private readonly MareConfigService _mareConfigService;
 | 
						|
    private readonly FileTransferOrchestrator _orchestrator;
 | 
						|
    private readonly ServerConfigurationManager _serverManager;
 | 
						|
    private readonly Dictionary<string, DateTime> _verifiedUploadedHashes = new(StringComparer.Ordinal);
 | 
						|
    private CancellationTokenSource? _uploadCancellationTokenSource = new();
 | 
						|
 | 
						|
    public FileUploadManager(ILogger<FileUploadManager> logger, MareMediator mediator,
 | 
						|
        MareConfigService mareConfigService,
 | 
						|
        FileTransferOrchestrator orchestrator,
 | 
						|
        FileCacheManager fileDbManager,
 | 
						|
        ServerConfigurationManager serverManager) : base(logger, mediator)
 | 
						|
    {
 | 
						|
        _mareConfigService = mareConfigService;
 | 
						|
        _orchestrator = orchestrator;
 | 
						|
        _fileDbManager = fileDbManager;
 | 
						|
        _serverManager = serverManager;
 | 
						|
 | 
						|
        Mediator.Subscribe<DisconnectedMessage>(this, (msg) =>
 | 
						|
        {
 | 
						|
            Reset();
 | 
						|
        });
 | 
						|
    }
 | 
						|
 | 
						|
    public List<FileTransfer> CurrentUploads { get; } = new();
 | 
						|
    public bool IsUploading => CurrentUploads.Count > 0;
 | 
						|
 | 
						|
    public bool CancelUpload()
 | 
						|
    {
 | 
						|
        if (CurrentUploads.Any())
 | 
						|
        {
 | 
						|
            Logger.LogDebug("Cancelling current upload");
 | 
						|
            _uploadCancellationTokenSource?.Cancel();
 | 
						|
            _uploadCancellationTokenSource?.Dispose();
 | 
						|
            _uploadCancellationTokenSource = null;
 | 
						|
            CurrentUploads.Clear();
 | 
						|
            return true;
 | 
						|
        }
 | 
						|
 | 
						|
        return false;
 | 
						|
    }
 | 
						|
 | 
						|
    public async Task DeleteAllFiles()
 | 
						|
    {
 | 
						|
        if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
 | 
						|
 | 
						|
        await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesDeleteAllFullPath(_orchestrator.FilesCdnUri!)).ConfigureAwait(false);
 | 
						|
    }
 | 
						|
 | 
						|
    public async Task<CharacterData> UploadFiles(CharacterData data, List<UserData> visiblePlayers)
 | 
						|
    {
 | 
						|
        CancelUpload();
 | 
						|
 | 
						|
        _uploadCancellationTokenSource = new CancellationTokenSource();
 | 
						|
        var uploadToken = _uploadCancellationTokenSource.Token;
 | 
						|
        Logger.LogDebug("Sending Character data {hash} to service {url}", data.DataHash.Value, _serverManager.CurrentApiUrl);
 | 
						|
 | 
						|
        HashSet<string> unverifiedUploads = GetUnverifiedFiles(data);
 | 
						|
        if (unverifiedUploads.Any())
 | 
						|
        {
 | 
						|
            await UploadUnverifiedFiles(unverifiedUploads, visiblePlayers, uploadToken).ConfigureAwait(false);
 | 
						|
            Logger.LogInformation("Upload complete for {hash}", data.DataHash.Value);
 | 
						|
        }
 | 
						|
 | 
						|
        foreach (var kvp in data.FileReplacements)
 | 
						|
        {
 | 
						|
            data.FileReplacements[kvp.Key].RemoveAll(i => _orchestrator.ForbiddenTransfers.Any(f => string.Equals(f.Hash, i.Hash, StringComparison.OrdinalIgnoreCase)));
 | 
						|
        }
 | 
						|
 | 
						|
        return data;
 | 
						|
    }
 | 
						|
 | 
						|
    protected override void Dispose(bool disposing)
 | 
						|
    {
 | 
						|
        base.Dispose(disposing);
 | 
						|
        Reset();
 | 
						|
    }
 | 
						|
 | 
						|
    private async Task<List<UploadFileDto>> FilesSend(List<string> hashes, List<string> uids, CancellationToken ct)
 | 
						|
    {
 | 
						|
        if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
 | 
						|
        FilesSendDto filesSendDto = new()
 | 
						|
        {
 | 
						|
            FileHashes = hashes,
 | 
						|
            UIDs = uids
 | 
						|
        };
 | 
						|
        var response = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesFilesSendFullPath(_orchestrator.FilesCdnUri!), filesSendDto, ct).ConfigureAwait(false);
 | 
						|
        return await response.Content.ReadFromJsonAsync<List<UploadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? new List<UploadFileDto>();
 | 
						|
    }
 | 
						|
 | 
						|
    private async Task<(string, byte[])> GetCompressedFileData(string fileHash, CancellationToken uploadToken)
 | 
						|
    {
 | 
						|
        var fileCache = _fileDbManager.GetFileCacheByHash(fileHash)!.ResolvedFilepath;
 | 
						|
        return (fileHash, LZ4Codec.WrapHC(await File.ReadAllBytesAsync(fileCache, uploadToken).ConfigureAwait(false), 0,
 | 
						|
            (int)new FileInfo(fileCache).Length));
 | 
						|
    }
 | 
						|
 | 
						|
    private HashSet<string> GetUnverifiedFiles(CharacterData data)
 | 
						|
    {
 | 
						|
        HashSet<string> unverifiedUploadHashes = new(StringComparer.Ordinal);
 | 
						|
        foreach (var item in data.FileReplacements.SelectMany(c => c.Value.Where(f => string.IsNullOrEmpty(f.FileSwapPath)).Select(v => v.Hash).Distinct(StringComparer.Ordinal)).Distinct(StringComparer.Ordinal).ToList())
 | 
						|
        {
 | 
						|
            if (!_verifiedUploadedHashes.TryGetValue(item, out var verifiedTime))
 | 
						|
            {
 | 
						|
                verifiedTime = DateTime.MinValue;
 | 
						|
            }
 | 
						|
 | 
						|
            if (verifiedTime < DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(10)))
 | 
						|
            {
 | 
						|
                Logger.LogTrace("Verifying {item}, last verified: {date}", item, verifiedTime);
 | 
						|
                unverifiedUploadHashes.Add(item);
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        return unverifiedUploadHashes;
 | 
						|
    }
 | 
						|
 | 
						|
    private void Reset()
 | 
						|
    {
 | 
						|
        _uploadCancellationTokenSource?.Cancel();
 | 
						|
        _uploadCancellationTokenSource?.Dispose();
 | 
						|
        _uploadCancellationTokenSource = null;
 | 
						|
        CurrentUploads.Clear();
 | 
						|
        _verifiedUploadedHashes.Clear();
 | 
						|
    }
 | 
						|
 | 
						|
    private async Task UploadFile(byte[] compressedFile, string fileHash, CancellationToken uploadToken)
 | 
						|
    {
 | 
						|
        if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
 | 
						|
 | 
						|
        Logger.LogInformation("[{hash}] Uploading {size}", fileHash, UiSharedService.ByteToString(compressedFile.Length));
 | 
						|
 | 
						|
        if (uploadToken.IsCancellationRequested) return;
 | 
						|
 | 
						|
        try
 | 
						|
        {
 | 
						|
            if (!_mareConfigService.Current.UseAlternativeFileUpload)
 | 
						|
                await UploadFileStream(compressedFile, fileHash, uploadToken).ConfigureAwait(false);
 | 
						|
            else
 | 
						|
                await UploadFileFull(compressedFile, fileHash, uploadToken).ConfigureAwait(false);
 | 
						|
        }
 | 
						|
        catch (Exception ex)
 | 
						|
        {
 | 
						|
            if (!_mareConfigService.Current.UseAlternativeFileUpload)
 | 
						|
            {
 | 
						|
                Logger.LogWarning(ex, "[{hash}] Error during file upload, trying alternative file upload", fileHash);
 | 
						|
                await UploadFileFull(compressedFile, fileHash, uploadToken).ConfigureAwait(false);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    private async Task UploadFileFull(byte[] compressedFile, string fileHash, CancellationToken uploadToken)
 | 
						|
    {
 | 
						|
        using var content = new ByteArrayContent(compressedFile);
 | 
						|
        content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
 | 
						|
 | 
						|
        var response = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), content, uploadToken).ConfigureAwait(false);
 | 
						|
        Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode);
 | 
						|
        CurrentUploads.Single(f => string.Equals(f.Hash, fileHash, StringComparison.Ordinal)).Transferred = compressedFile.Length;
 | 
						|
    }
 | 
						|
 | 
						|
    private async Task UploadFileStream(byte[] compressedFile, string fileHash, CancellationToken uploadToken)
 | 
						|
    {
 | 
						|
        using var ms = new MemoryStream(compressedFile);
 | 
						|
 | 
						|
        Progress<UploadProgress> prog = new((prog) =>
 | 
						|
        {
 | 
						|
            try
 | 
						|
            {
 | 
						|
                CurrentUploads.Single(f => string.Equals(f.Hash, fileHash, StringComparison.Ordinal)).Transferred = prog.Uploaded;
 | 
						|
            }
 | 
						|
            catch (Exception ex)
 | 
						|
            {
 | 
						|
                Logger.LogWarning(ex, "[{hash}] Could not set upload progress", fileHash);
 | 
						|
            }
 | 
						|
        });
 | 
						|
        var streamContent = new ProgressableStreamContent(ms, prog);
 | 
						|
        streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
 | 
						|
 | 
						|
        var response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false);
 | 
						|
        Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode);
 | 
						|
    }
 | 
						|
 | 
						|
    private async Task UploadUnverifiedFiles(HashSet<string> unverifiedUploadHashes, List<UserData> visiblePlayers, CancellationToken uploadToken)
 | 
						|
    {
 | 
						|
        unverifiedUploadHashes = unverifiedUploadHashes.Where(h => _fileDbManager.GetFileCacheByHash(h) != null).ToHashSet(StringComparer.Ordinal);
 | 
						|
 | 
						|
        Logger.LogDebug("Verifying {count} files", unverifiedUploadHashes.Count);
 | 
						|
        var filesToUpload = await FilesSend(unverifiedUploadHashes.ToList(), visiblePlayers.Select(p => p.UID).ToList(), uploadToken).ConfigureAwait(false);
 | 
						|
 | 
						|
        foreach (var file in filesToUpload.Where(f => !f.IsForbidden))
 | 
						|
        {
 | 
						|
            try
 | 
						|
            {
 | 
						|
                CurrentUploads.Add(new UploadFileTransfer(file)
 | 
						|
                {
 | 
						|
                    Total = new FileInfo(_fileDbManager.GetFileCacheByHash(file.Hash)!.ResolvedFilepath).Length,
 | 
						|
                });
 | 
						|
            }
 | 
						|
            catch (Exception ex)
 | 
						|
            {
 | 
						|
                Logger.LogWarning(ex, "Tried to request file {hash} but file was not present", file.Hash);
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        foreach (var file in filesToUpload.Where(c => c.IsForbidden))
 | 
						|
        {
 | 
						|
            if (_orchestrator.ForbiddenTransfers.All(f => !string.Equals(f.Hash, file.Hash, StringComparison.Ordinal)))
 | 
						|
            {
 | 
						|
                _orchestrator.ForbiddenTransfers.Add(new UploadFileTransfer(file)
 | 
						|
                {
 | 
						|
                    LocalFile = _fileDbManager.GetFileCacheByHash(file.Hash)?.ResolvedFilepath ?? string.Empty,
 | 
						|
                });
 | 
						|
            }
 | 
						|
 | 
						|
            _verifiedUploadedHashes[file.Hash] = DateTime.UtcNow;
 | 
						|
        }
 | 
						|
 | 
						|
        var totalSize = CurrentUploads.Sum(c => c.Total);
 | 
						|
        Logger.LogDebug("Compressing and uploading files");
 | 
						|
        Task uploadTask = Task.CompletedTask;
 | 
						|
        foreach (var file in CurrentUploads.Where(f => f.CanBeTransferred && !f.IsTransferred).ToList())
 | 
						|
        {
 | 
						|
            Logger.LogDebug("[{hash}] Compressing", file);
 | 
						|
            var data = await GetCompressedFileData(file.Hash, uploadToken).ConfigureAwait(false);
 | 
						|
            CurrentUploads.Single(e => string.Equals(e.Hash, data.Item1, StringComparison.Ordinal)).Total = data.Item2.Length;
 | 
						|
            Logger.LogDebug("[{hash}] Starting upload for {filePath}", data.Item1, _fileDbManager.GetFileCacheByHash(data.Item1)!.ResolvedFilepath);
 | 
						|
            await uploadTask.ConfigureAwait(false);
 | 
						|
            uploadTask = UploadFile(data.Item2, file.Hash, uploadToken);
 | 
						|
            uploadToken.ThrowIfCancellationRequested();
 | 
						|
            _verifiedUploadedHashes[file.Hash] = DateTime.UtcNow;
 | 
						|
        }
 | 
						|
 | 
						|
        if (CurrentUploads.Any())
 | 
						|
        {
 | 
						|
            await uploadTask.ConfigureAwait(false);
 | 
						|
 | 
						|
            var compressedSize = CurrentUploads.Sum(c => c.Total);
 | 
						|
            Logger.LogDebug("Upload complete, compressed {size} to {compressed}", UiSharedService.ByteToString(totalSize), UiSharedService.ByteToString(compressedSize));
 | 
						|
        }
 | 
						|
 | 
						|
        foreach (var file in unverifiedUploadHashes.Where(c => !CurrentUploads.Any(u => string.Equals(u.Hash, c, StringComparison.Ordinal))))
 | 
						|
        {
 | 
						|
            _verifiedUploadedHashes[file] = DateTime.UtcNow;
 | 
						|
        }
 | 
						|
 | 
						|
        CurrentUploads.Clear();
 | 
						|
    }
 | 
						|
} |