From 4e2df4bbc4a4b4580fb883d88c87ccaf444093cc Mon Sep 17 00:00:00 2001 From: rootdarkarchon Date: Tue, 1 Aug 2023 16:48:16 +0200 Subject: [PATCH] Single file transfers (#56) * move to single file transfer and extraction per server download * clean up downloads --------- Co-authored-by: rootdarkarchon --- MareAPI | 2 +- .../FileCache/PeriodicFileScanner.cs | 2 +- .../PlayerData/Handlers/PairHandler.cs | 1 + .../WebAPI/Files/FileDownloadManager.cs | 200 +++++++++++------- 4 files changed, 127 insertions(+), 78 deletions(-) diff --git a/MareAPI b/MareAPI index a5373bc..4aacbb7 160000 --- a/MareAPI +++ b/MareAPI @@ -1 +1 @@ -Subproject commit a5373bca246ee3dab590a2eac68d974ba546ee8c +Subproject commit 4aacbb78bb2793ae4e3f0f1b809e491e2cae7567 diff --git a/MareSynchronos/FileCache/PeriodicFileScanner.cs b/MareSynchronos/FileCache/PeriodicFileScanner.cs index f36abb9..9a634ae 100644 --- a/MareSynchronos/FileCache/PeriodicFileScanner.cs +++ b/MareSynchronos/FileCache/PeriodicFileScanner.cs @@ -63,7 +63,7 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase _scanCancellationTokenSource?.Cancel(); _scanCancellationTokenSource = new CancellationTokenSource(); var token = _scanCancellationTokenSource.Token; - Task.Run(async () => + _ = Task.Run(async () => { while (!token.IsCancellationRequested) { diff --git a/MareSynchronos/PlayerData/Handlers/PairHandler.cs b/MareSynchronos/PlayerData/Handlers/PairHandler.cs index 7d8cced..f222b93 100644 --- a/MareSynchronos/PlayerData/Handlers/PairHandler.cs +++ b/MareSynchronos/PlayerData/Handlers/PairHandler.cs @@ -59,6 +59,7 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase Mediator.Subscribe(this, (_) => FrameworkUpdate()); Mediator.Subscribe(this, (_) => { + _downloadCancellationTokenSource?.CancelDispose(); MediatorUnsubscribeFromCharacterChanged(); _charaHandler?.Invalidate(); IsVisible = false; diff --git a/MareSynchronos/WebAPI/Files/FileDownloadManager.cs b/MareSynchronos/WebAPI/Files/FileDownloadManager.cs index 26b7f9e..277ce53 100644 --- a/MareSynchronos/WebAPI/Files/FileDownloadManager.cs +++ b/MareSynchronos/WebAPI/Files/FileDownloadManager.cs @@ -77,20 +77,41 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase base.Dispose(disposing); } - private async Task DownloadFileHttpClient(string downloadGroup, DownloadFileTransfer fileTransfer, string tempPath, IProgress progress, CancellationToken ct) + private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream) { - var requestId = await GetQueueRequest(fileTransfer, ct).ConfigureAwait(false); + List hashName = new(); + List fileLength = new(); + var separator = (char)fileBlockStream.ReadByte(); + if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #"); - Logger.LogDebug("GUID {requestId} for file {hash} on server {uri}", requestId, fileTransfer.Hash, fileTransfer.DownloadUri); + bool readHash = false; + while (true) + { + var readChar = (char)fileBlockStream.ReadByte(); + if (readChar == ':') + { + readHash = true; + continue; + } + if (readChar == '#') break; + if (!readHash) hashName.Add(readChar); + else fileLength.Add(readChar); + } + return (string.Join("", hashName), long.Parse(string.Join("", fileLength))); + } + + private async Task DownloadFileHttpClient(string downloadGroup, Guid requestId, List fileTransfer, string tempPath, IProgress progress, CancellationToken ct) + { + Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList())); await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false); _downloadStatus[downloadGroup].DownloadStatus = DownloadStatus.Downloading; HttpResponseMessage response = null!; - var requestUrl = MareFiles.CacheGetFullPath(fileTransfer.DownloadUri, requestId); + var requestUrl = MareFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId); - Logger.LogDebug("Downloading {requestUrl} for file {hash}", requestUrl, fileTransfer.Hash); + Logger.LogDebug("Downloading {requestUrl} for request {id}", requestUrl, requestId); try { response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct).ConfigureAwait(false); @@ -144,7 +165,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List fileReplacement, CancellationToken ct) { - Logger.LogDebug("Downloading files for {id}", gameObjectHandler.Name); + Logger.LogDebug("Download start: {id}", gameObjectHandler.Name); List downloadFileInfoFromService = new(); downloadFileInfoFromService.AddRange(await FilesGetSizes(fileReplacement.Select(f => f.Hash).ToList(), ct).ConfigureAwait(false)); @@ -156,7 +177,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden)) { - if (!_orchestrator.ForbiddenTransfers.Any(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal))) + if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal))) { _orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto)); } @@ -170,7 +191,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { DownloadStatus = DownloadStatus.Initializing, TotalBytes = downloadGroup.Sum(c => c.Total), - TotalFiles = downloadGroup.Count(), + TotalFiles = 1, TransferredBytes = 0, TransferredFiles = 0 }; @@ -186,88 +207,96 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase async (fileGroup, token) => { // let server predownload files - await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri), + var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri), fileGroup.Select(c => c.Hash), token).ConfigureAwait(false); + Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri, requestIdResponse.Content.ReadAsStringAsync().Result); - foreach (var file in fileGroup) + Guid requestId = Guid.Parse(requestIdResponse.Content.ReadAsStringAsync().Result.Trim('"')); + _downloadReady[requestId] = false; + + Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri); + + var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk", true); + try { - var ext = fileReplacement.First(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase)).GamePaths.First().Split(".").Last(); - var tempPath = _fileDbManager.GetCacheFilePath(file.Hash, ext, isTemporaryFile: true); + _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot; + await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); + _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue; Progress progress = new((bytesDownloaded) => { try { if (!_downloadStatus.ContainsKey(fileGroup.Key)) return; _downloadStatus[fileGroup.Key].TransferredBytes += bytesDownloaded; - file.Transferred += bytesDownloaded; } catch (Exception ex) { Logger.LogWarning(ex, "Could not set download progress"); } }); + await DownloadFileHttpClient(fileGroup.Key, requestId, fileGroup.ToList(), blockFile, progress, token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + _orchestrator.ReleaseDownloadSlot(); + File.Delete(blockFile); + Logger.LogDebug("Detected cancellation, removing {id}", gameObjectHandler); + CancelDownload(); + return; + } + catch (Exception ex) + { + _orchestrator.ReleaseDownloadSlot(); + File.Delete(blockFile); + Logger.LogError(ex, "Error during download of {id}", requestId); + CancelDownload(); + return; + } - try - { - _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot; - await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); - _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue; - await DownloadFileHttpClient(fileGroup.Key, file, tempPath, progress, token).ConfigureAwait(false); - _downloadStatus[fileGroup.Key].TransferredFiles += 1; - } - catch (OperationCanceledException) - { - File.Delete(tempPath); - Logger.LogDebug("Detected cancellation, removing {id}", gameObjectHandler); - CancelDownload(); - return; - } - catch (Exception ex) - { - Logger.LogError(ex, "Error during download of {hash}", file.Hash); - continue; - } - finally - { - _orchestrator.ReleaseDownloadSlot(); - } - + FileStream? fileBlockStream = null; + try + { + _downloadStatus[fileGroup.Key].TransferredFiles = 1; _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.Decompressing; - var tempFileData = await File.ReadAllBytesAsync(tempPath, token).ConfigureAwait(false); - var extractedFile = LZ4Codec.Unwrap(tempFileData); - File.Delete(tempPath); - var filePath = _fileDbManager.GetCacheFilePath(file.Hash, ext, isTemporaryFile: false); - await File.WriteAllBytesAsync(filePath, extractedFile, token).ConfigureAwait(false); - var fi = new FileInfo(filePath); - Func RandomDayInThePast() + fileBlockStream = File.OpenRead(blockFile); + while (fileBlockStream.Position < fileBlockStream.Length) { - DateTime start = new(1995, 1, 1); - Random gen = new(); - int range = (DateTime.Today - start).Days; - return () => start.AddDays(gen.Next(range)); - } + (string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream); - fi.CreationTime = RandomDayInThePast().Invoke(); - fi.LastAccessTime = DateTime.Today; - fi.LastWriteTime = RandomDayInThePast().Invoke(); - try - { - var entry = _fileDbManager.CreateCacheEntry(filePath); - if (!string.Equals(entry?.Hash, file.Hash, StringComparison.OrdinalIgnoreCase)) + try { - Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry?.Hash, file.Hash); - File.Delete(filePath); - _fileDbManager.RemoveHashedFile(entry); + Logger.LogDebug("Found file {file} with length {le}, decompressing download", fileHash, fileLengthBytes); + var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".").Last(); + + byte[] compressedFileContent = new byte[fileLengthBytes]; + _ = await fileBlockStream.ReadAsync(compressedFileContent, token).ConfigureAwait(false); + + var decompressedFile = LZ4Codec.Unwrap(compressedFileContent); + var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension, false); + await File.WriteAllBytesAsync(filePath, decompressedFile, token).ConfigureAwait(false); + + PersistFileToStorage(fileHash, filePath); + } + catch (Exception e) + { + Logger.LogWarning(e, "Error during decompression"); } } - catch (Exception ex) - { - Logger.LogWarning(ex, "Issue creating cache entry"); - } + } + catch (Exception ex) + { + Logger.LogError(ex, "Error during block file read"); + } + finally + { + _orchestrator.ReleaseDownloadSlot(); + fileBlockStream?.Dispose(); + File.Delete(blockFile); } }).ConfigureAwait(false); - Logger.LogDebug("Download for {id} complete", gameObjectHandler); + Logger.LogDebug("Download end: {id}", gameObjectHandler); + CancelDownload(); } @@ -278,19 +307,37 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase return await response.Content.ReadFromJsonAsync>(cancellationToken: ct).ConfigureAwait(false) ?? new List(); } - private async Task GetQueueRequest(DownloadFileTransfer downloadFileTransfer, CancellationToken ct) + private void PersistFileToStorage(string fileHash, string filePath) { - var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestRequestFileFullPath(downloadFileTransfer.DownloadUri, downloadFileTransfer.Hash), ct).ConfigureAwait(false); - var responseString = await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false); - var requestId = Guid.Parse(responseString.Trim('"')); - if (!_downloadReady.ContainsKey(requestId)) + var fi = new FileInfo(filePath); + Func RandomDayInThePast() { - _downloadReady[requestId] = false; + DateTime start = new(1995, 1, 1); + Random gen = new(); + int range = (DateTime.Today - start).Days; + return () => start.AddDays(gen.Next(range)); + } + + fi.CreationTime = RandomDayInThePast().Invoke(); + fi.LastAccessTime = DateTime.Today; + fi.LastWriteTime = RandomDayInThePast().Invoke(); + try + { + var entry = _fileDbManager.CreateCacheEntry(filePath); + if (!string.Equals(entry?.Hash, fileHash, StringComparison.OrdinalIgnoreCase)) + { + Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry?.Hash, fileHash); + File.Delete(filePath); + _fileDbManager.RemoveHashedFile(entry); + } + } + catch (Exception ex) + { + Logger.LogWarning(ex, "Error creating cache entry"); } - return requestId; } - private async Task WaitForDownloadReady(DownloadFileTransfer downloadFileTransfer, Guid requestId, CancellationToken downloadCt) + private async Task WaitForDownloadReady(List downloadFileTransfer, Guid requestId, CancellationToken downloadCt) { bool alreadyCancelled = false; try @@ -309,7 +356,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { if (downloadCt.IsCancellationRequested) throw; - var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer.DownloadUri, requestId, downloadFileTransfer.Hash), downloadCt).ConfigureAwait(false); + var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId), + downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false); req.EnsureSuccessStatusCode(); localTimeoutCts.Dispose(); composite.Dispose(); @@ -328,7 +376,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { try { - await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer.DownloadUri, requestId)).ConfigureAwait(false); + await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false); alreadyCancelled = true; } catch @@ -344,7 +392,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { try { - await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer.DownloadUri, requestId)).ConfigureAwait(false); + await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false); } catch {