452 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			452 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| using Dalamud.Utility;
 | |
| using LZ4;
 | |
| using MareSynchronos.API.Data;
 | |
| using MareSynchronos.API.Dto.Files;
 | |
| using MareSynchronos.API.Routes;
 | |
| using MareSynchronos.FileCache;
 | |
| using MareSynchronos.PlayerData.Handlers;
 | |
| using MareSynchronos.Services.Mediator;
 | |
| using MareSynchronos.WebAPI.Files.Models;
 | |
| using Microsoft.Extensions.Logging;
 | |
| using System.Net;
 | |
| using System.Net.Http.Json;
 | |
| 
 | |
| namespace MareSynchronos.WebAPI.Files;
 | |
| 
 | |
| public partial class FileDownloadManager : DisposableMediatorSubscriberBase
 | |
| {
 | |
|     private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
 | |
|     private readonly FileCompactor _fileCompactor;
 | |
|     private readonly FileCacheManager _fileDbManager;
 | |
|     private readonly FileTransferOrchestrator _orchestrator;
 | |
|     private readonly List<ThrottledStream> _activeDownloadStreams;
 | |
| 
 | |
|     public FileDownloadManager(ILogger<FileDownloadManager> logger, MareMediator mediator,
 | |
|         FileTransferOrchestrator orchestrator,
 | |
|         FileCacheManager fileCacheManager, FileCompactor fileCompactor) : base(logger, mediator)
 | |
|     {
 | |
|         _downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
 | |
|         _orchestrator = orchestrator;
 | |
|         _fileDbManager = fileCacheManager;
 | |
|         _fileCompactor = fileCompactor;
 | |
|         _activeDownloadStreams = [];
 | |
| 
 | |
|         Mediator.Subscribe<DownloadLimitChangedMessage>(this, (msg) =>
 | |
|         {
 | |
|             if (!_activeDownloadStreams.Any()) return;
 | |
|             var newLimit = _orchestrator.DownloadLimitPerSlot();
 | |
|             Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
 | |
|             foreach (var stream in _activeDownloadStreams)
 | |
|             {
 | |
|                 stream.BandwidthLimit = newLimit;
 | |
|             }
 | |
|         });
 | |
|     }
 | |
| 
 | |
|     public List<DownloadFileTransfer> CurrentDownloads { get; private set; } = [];
 | |
| 
 | |
|     public List<FileTransfer> ForbiddenTransfers => _orchestrator.ForbiddenTransfers;
 | |
| 
 | |
|     public bool IsDownloading => !CurrentDownloads.Any();
 | |
| 
 | |
|     public static void MungeBuffer(Span<byte> buffer)
 | |
|     {
 | |
|         for (int i = 0; i < buffer.Length; ++i)
 | |
|         {
 | |
|             buffer[i] ^= 42;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     public void CancelDownload()
 | |
|     {
 | |
|         CurrentDownloads.Clear();
 | |
|         _downloadStatus.Clear();
 | |
|     }
 | |
| 
 | |
|     public async Task DownloadFiles(GameObjectHandler gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct)
 | |
|     {
 | |
|         Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
 | |
|         try
 | |
|         {
 | |
|             await DownloadFilesInternal(gameObject, fileReplacementDto, ct).ConfigureAwait(false);
 | |
|         }
 | |
|         catch
 | |
|         {
 | |
|             CancelDownload();
 | |
|         }
 | |
|         finally
 | |
|         {
 | |
|             Mediator.Publish(new DownloadFinishedMessage(gameObject));
 | |
|             Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     protected override void Dispose(bool disposing)
 | |
|     {
 | |
|         CancelDownload();
 | |
|         foreach (var stream in _activeDownloadStreams)
 | |
|         {
 | |
|             try
 | |
|             {
 | |
|                 stream.Dispose();
 | |
|             }
 | |
|             catch { }
 | |
|         }
 | |
|         base.Dispose(disposing);
 | |
|     }
 | |
| 
 | |
|     private static byte MungeByte(int byteOrEof)
 | |
|     {
 | |
|         if (byteOrEof == -1)
 | |
|         {
 | |
|             throw new EndOfStreamException();
 | |
|         }
 | |
| 
 | |
|         return (byte)(byteOrEof ^ 42);
 | |
|     }
 | |
| 
 | |
|     private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
 | |
|     {
 | |
|         List<char> hashName = [];
 | |
|         List<char> fileLength = [];
 | |
|         var separator = (char)MungeByte(fileBlockStream.ReadByte());
 | |
|         if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
 | |
| 
 | |
|         bool readHash = false;
 | |
|         while (true)
 | |
|         {
 | |
|             var readChar = (char)MungeByte(fileBlockStream.ReadByte());
 | |
|             if (readChar == ':')
 | |
|             {
 | |
|                 readHash = true;
 | |
|                 continue;
 | |
|             }
 | |
|             if (readChar == '#') break;
 | |
|             if (!readHash) hashName.Add(readChar);
 | |
|             else fileLength.Add(readChar);
 | |
|         }
 | |
|         return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
 | |
|     }
 | |
| 
 | |
|     private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List<DownloadFileTransfer> fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
 | |
|     {
 | |
|         Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList()));
 | |
| 
 | |
|         await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false);
 | |
| 
 | |
|         _downloadStatus[downloadGroup].DownloadStatus = DownloadStatus.Downloading;
 | |
| 
 | |
|         HttpResponseMessage response = null!;
 | |
|         var requestUrl = MareFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId);
 | |
| 
 | |
|         Logger.LogDebug("Downloading {requestUrl} for request {id}", requestUrl, requestId);
 | |
|         try
 | |
|         {
 | |
|             response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
 | |
|             response.EnsureSuccessStatusCode();
 | |
|         }
 | |
|         catch (HttpRequestException ex)
 | |
|         {
 | |
|             Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode);
 | |
|             if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
 | |
|             {
 | |
|                 throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         ThrottledStream? stream = null;
 | |
|         try
 | |
|         {
 | |
|             var fileStream = File.Create(tempPath);
 | |
|             await using (fileStream.ConfigureAwait(false))
 | |
|             {
 | |
|                 var bufferSize = response.Content.Headers.ContentLength > 1024 * 1024 ? 4096 : 1024;
 | |
|                 var buffer = new byte[bufferSize];
 | |
| 
 | |
|                 var bytesRead = 0;
 | |
|                 var limit = _orchestrator.DownloadLimitPerSlot();
 | |
|                 Logger.LogTrace("Starting Download of {id} with a speed limit of {limit}", requestId, limit);
 | |
|                 stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
 | |
|                 _activeDownloadStreams.Add(stream);
 | |
|                 while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0)
 | |
|                 {
 | |
|                     ct.ThrowIfCancellationRequested();
 | |
| 
 | |
|                     MungeBuffer(buffer.AsSpan(0, bytesRead));
 | |
| 
 | |
|                     await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
 | |
| 
 | |
|                     progress.Report(bytesRead);
 | |
|                 }
 | |
| 
 | |
|                 Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath);
 | |
|             }
 | |
|         }
 | |
|         catch (Exception ex)
 | |
|         {
 | |
|             Logger.LogWarning(ex, "Error during file download of {requestUrl}", requestUrl);
 | |
|             try
 | |
|             {
 | |
|                 if (!tempPath.IsNullOrEmpty())
 | |
|                     File.Delete(tempPath);
 | |
|             }
 | |
|             catch
 | |
|             {
 | |
|                 // ignore if file deletion fails
 | |
|             }
 | |
|             throw;
 | |
|         }
 | |
|         finally
 | |
|         {
 | |
|             if (stream != null)
 | |
|             {
 | |
|                 _activeDownloadStreams.Remove(stream);
 | |
|                 await stream.DisposeAsync().ConfigureAwait(false);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
 | |
|     {
 | |
|         Logger.LogDebug("Download start: {id}", gameObjectHandler.Name);
 | |
| 
 | |
|         List<DownloadFileDto> downloadFileInfoFromService =
 | |
|         [
 | |
|             .. await FilesGetSizes(fileReplacement.Select(f => f.Hash).Distinct(StringComparer.Ordinal).ToList(), ct).ConfigureAwait(false),
 | |
|         ];
 | |
| 
 | |
|         Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
 | |
| 
 | |
|         CurrentDownloads = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d))
 | |
|             .Where(d => d.CanBeTransferred).ToList();
 | |
| 
 | |
|         foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
 | |
|         {
 | |
|             if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
 | |
|             {
 | |
|                 _orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         var downloadGroups = CurrentDownloads.Where(f => f.CanBeTransferred).GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal);
 | |
| 
 | |
|         foreach (var downloadGroup in downloadGroups)
 | |
|         {
 | |
|             _downloadStatus[downloadGroup.Key] = new FileDownloadStatus()
 | |
|             {
 | |
|                 DownloadStatus = DownloadStatus.Initializing,
 | |
|                 TotalBytes = downloadGroup.Sum(c => c.Total),
 | |
|                 TotalFiles = 1,
 | |
|                 TransferredBytes = 0,
 | |
|                 TransferredFiles = 0
 | |
|             };
 | |
|         }
 | |
| 
 | |
|         Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
 | |
| 
 | |
|         await Parallel.ForEachAsync(downloadGroups, new ParallelOptions()
 | |
|         {
 | |
|             MaxDegreeOfParallelism = downloadGroups.Count(),
 | |
|             CancellationToken = ct,
 | |
|         },
 | |
|         async (fileGroup, token) =>
 | |
|         {
 | |
|             // let server predownload files
 | |
|             var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri),
 | |
|                 fileGroup.Select(c => c.Hash), token).ConfigureAwait(false);
 | |
|             Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri,
 | |
|                 await requestIdResponse.Content.ReadAsStringAsync(token).ConfigureAwait(false));
 | |
| 
 | |
|             Guid requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"'));
 | |
| 
 | |
|             Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
 | |
| 
 | |
|             var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
 | |
|             try
 | |
|             {
 | |
|                 _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
 | |
|                 await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
 | |
|                 _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue;
 | |
|                 Progress<long> progress = new((bytesDownloaded) =>
 | |
|                 {
 | |
|                     try
 | |
|                     {
 | |
|                         if (!_downloadStatus.TryGetValue(fileGroup.Key, out FileDownloadStatus? value)) return;
 | |
|                         value.TransferredBytes += bytesDownloaded;
 | |
|                     }
 | |
|                     catch (Exception ex)
 | |
|                     {
 | |
|                         Logger.LogWarning(ex, "Could not set download progress");
 | |
|                     }
 | |
|                 });
 | |
|                 await DownloadAndMungeFileHttpClient(fileGroup.Key, requestId, [.. fileGroup], blockFile, progress, token).ConfigureAwait(false);
 | |
|             }
 | |
|             catch (OperationCanceledException)
 | |
|             {
 | |
|                 _orchestrator.ReleaseDownloadSlot();
 | |
|                 File.Delete(blockFile);
 | |
|                 Logger.LogDebug("Detected cancellation, removing {id}", gameObjectHandler);
 | |
|                 CancelDownload();
 | |
|                 return;
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 _orchestrator.ReleaseDownloadSlot();
 | |
|                 File.Delete(blockFile);
 | |
|                 Logger.LogError(ex, "Error during download of {id}", requestId);
 | |
|                 CancelDownload();
 | |
|                 return;
 | |
|             }
 | |
| 
 | |
|             FileStream? fileBlockStream = null;
 | |
|             try
 | |
|             {
 | |
|                 _downloadStatus[fileGroup.Key].TransferredFiles = 1;
 | |
|                 _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.Decompressing;
 | |
|                 fileBlockStream = File.OpenRead(blockFile);
 | |
|                 while (fileBlockStream.Position < fileBlockStream.Length)
 | |
|                 {
 | |
|                     (string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
 | |
| 
 | |
|                     try
 | |
|                     {
 | |
|                         Logger.LogDebug("Found file {file} with length {le}, decompressing download", fileHash, fileLengthBytes);
 | |
|                         var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
 | |
| 
 | |
|                         byte[] compressedFileContent = new byte[fileLengthBytes];
 | |
|                         _ = await fileBlockStream.ReadAsync(compressedFileContent, token).ConfigureAwait(false);
 | |
|                         MungeBuffer(compressedFileContent);
 | |
| 
 | |
|                         var decompressedFile = LZ4Codec.Unwrap(compressedFileContent);
 | |
|                         var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
 | |
|                         await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, token).ConfigureAwait(false);
 | |
| 
 | |
|                         PersistFileToStorage(fileHash, filePath);
 | |
|                     }
 | |
|                     catch (Exception e)
 | |
|                     {
 | |
|                         Logger.LogWarning(e, "Error during decompression");
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 Logger.LogError(ex, "Error during block file read");
 | |
|             }
 | |
|             finally
 | |
|             {
 | |
|                 _orchestrator.ReleaseDownloadSlot();
 | |
|                 if (fileBlockStream != null)
 | |
|                     await fileBlockStream.DisposeAsync().ConfigureAwait(false);
 | |
|                 File.Delete(blockFile);
 | |
|             }
 | |
|         }).ConfigureAwait(false);
 | |
| 
 | |
|         Logger.LogDebug("Download end: {id}", gameObjectHandler);
 | |
| 
 | |
|         CancelDownload();
 | |
|     }
 | |
| 
 | |
|     private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)
 | |
|     {
 | |
|         if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
 | |
|         var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false);
 | |
|         return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
 | |
|     }
 | |
| 
 | |
|     private void PersistFileToStorage(string fileHash, string filePath)
 | |
|     {
 | |
|         var fi = new FileInfo(filePath);
 | |
|         Func<DateTime> RandomDayInThePast()
 | |
|         {
 | |
|             DateTime start = new(1995, 1, 1, 1, 1, 1, DateTimeKind.Local);
 | |
|             Random gen = new();
 | |
|             int range = (DateTime.Today - start).Days;
 | |
|             return () => start.AddDays(gen.Next(range));
 | |
|         }
 | |
| 
 | |
|         fi.CreationTime = RandomDayInThePast().Invoke();
 | |
|         fi.LastAccessTime = DateTime.Today;
 | |
|         fi.LastWriteTime = RandomDayInThePast().Invoke();
 | |
|         try
 | |
|         {
 | |
|             var entry = _fileDbManager.CreateCacheEntry(filePath);
 | |
|             if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
 | |
|             {
 | |
|                 Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry.Hash, fileHash);
 | |
|                 File.Delete(filePath);
 | |
|                 _fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath);
 | |
|             }
 | |
|         }
 | |
|         catch (Exception ex)
 | |
|         {
 | |
|             Logger.LogWarning(ex, "Error creating cache entry");
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
 | |
|     {
 | |
|         bool alreadyCancelled = false;
 | |
|         try
 | |
|         {
 | |
|             CancellationTokenSource localTimeoutCts = new();
 | |
|             localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
 | |
|             CancellationTokenSource composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
 | |
| 
 | |
|             while (!_orchestrator.IsDownloadReady(requestId))
 | |
|             {
 | |
|                 try
 | |
|                 {
 | |
|                     await Task.Delay(250, composite.Token).ConfigureAwait(false);
 | |
|                 }
 | |
|                 catch (TaskCanceledException)
 | |
|                 {
 | |
|                     if (downloadCt.IsCancellationRequested) throw;
 | |
| 
 | |
|                     var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
 | |
|                         downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false);
 | |
|                     req.EnsureSuccessStatusCode();
 | |
|                     localTimeoutCts.Dispose();
 | |
|                     composite.Dispose();
 | |
|                     localTimeoutCts = new();
 | |
|                     localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
 | |
|                     composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             localTimeoutCts.Dispose();
 | |
|             composite.Dispose();
 | |
| 
 | |
|             Logger.LogDebug("Download {requestId} ready", requestId);
 | |
|         }
 | |
|         catch (TaskCanceledException)
 | |
|         {
 | |
|             try
 | |
|             {
 | |
|                 await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
 | |
|                 alreadyCancelled = true;
 | |
|             }
 | |
|             catch
 | |
|             {
 | |
|                 // ignore whatever happens here
 | |
|             }
 | |
| 
 | |
|             throw;
 | |
|         }
 | |
|         finally
 | |
|         {
 | |
|             if (downloadCt.IsCancellationRequested && !alreadyCancelled)
 | |
|             {
 | |
|                 try
 | |
|                 {
 | |
|                     await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
 | |
|                 }
 | |
|                 catch
 | |
|                 {
 | |
|                     // ignore whatever happens here
 | |
|                 }
 | |
|             }
 | |
|             _orchestrator.ClearDownloadRequest(requestId);
 | |
|         }
 | |
|     }
 | |
| } | 
