Files
ClubPenguinClient/MareSynchronos/WebAPI/Files/FileDownloadManager.cs
Loporrit cc3dc02afb Revert the download change to see if it fixes the download bars
Revert "Fix hung downloads"

This reverts commit 0b7381f486.

Revert "Avoid missing blk file errors on early download abort"

This reverts commit e4e24d1831.
2025-06-28 17:26:03 +00:00

511 lines
21 KiB
C#

using Dalamud.Utility;
using K4os.Compression.LZ4.Streams;
using MareSynchronos.API.Data;
using MareSynchronos.API.Dto.Files;
using MareSynchronos.API.Routes;
using MareSynchronos.FileCache;
using MareSynchronos.PlayerData.Handlers;
using MareSynchronos.Services.Mediator;
using MareSynchronos.Utils;
using MareSynchronos.WebAPI.Files.Models;
using MareSynchronosStaticFilesServer.Utils;
using Microsoft.Extensions.Logging;
using System.Net;
using System.Net.Http.Json;
using System.Security.Cryptography;
namespace MareSynchronos.WebAPI.Files;
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
{
private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
private readonly FileCompactor _fileCompactor;
private readonly FileCacheManager _fileDbManager;
private readonly FileTransferOrchestrator _orchestrator;
private readonly List<ThrottledStream> _activeDownloadStreams;
public FileDownloadManager(ILogger<FileDownloadManager> logger, MareMediator mediator,
FileTransferOrchestrator orchestrator,
FileCacheManager fileCacheManager, FileCompactor fileCompactor) : base(logger, mediator)
{
_downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
_orchestrator = orchestrator;
_fileDbManager = fileCacheManager;
_fileCompactor = fileCompactor;
_activeDownloadStreams = [];
Mediator.Subscribe<DownloadLimitChangedMessage>(this, (msg) =>
{
if (!_activeDownloadStreams.Any()) return;
var newLimit = _orchestrator.DownloadLimitPerSlot();
Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
foreach (var stream in _activeDownloadStreams)
{
stream.BandwidthLimit = newLimit;
}
});
}
public List<DownloadFileTransfer> CurrentDownloads { get; private set; } = [];
public List<FileTransfer> ForbiddenTransfers => _orchestrator.ForbiddenTransfers;
public bool IsDownloading => !CurrentDownloads.Any();
public void ClearDownload()
{
CurrentDownloads.Clear();
_downloadStatus.Clear();
}
public async Task DownloadFiles(GameObjectHandler gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct)
{
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
try
{
await DownloadFilesInternal(gameObject, fileReplacementDto, ct).ConfigureAwait(false);
}
catch
{
ClearDownload();
}
finally
{
Mediator.Publish(new DownloadFinishedMessage(gameObject));
Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
}
}
protected override void Dispose(bool disposing)
{
ClearDownload();
foreach (var stream in _activeDownloadStreams.ToList())
{
try
{
stream.Dispose();
}
catch
{
// do nothing
//
}
}
base.Dispose(disposing);
}
private static byte ConvertReadByte(int byteOrEof)
{
if (byteOrEof == -1)
{
throw new EndOfStreamException();
}
return (byte)byteOrEof;
}
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
{
List<char> hashName = [];
List<char> fileLength = [];
var separator = (char)ConvertReadByte(fileBlockStream.ReadByte());
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
bool readHash = false;
while (true)
{
int readByte = fileBlockStream.ReadByte();
if (readByte == -1)
throw new EndOfStreamException();
var readChar = (char)ConvertReadByte(readByte);
if (readChar == ':')
{
readHash = true;
continue;
}
if (readChar == '#') break;
if (!readHash) hashName.Add(readChar);
else fileLength.Add(readChar);
}
if (fileLength.Count == 0)
fileLength.Add('0');
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
}
private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List<DownloadFileTransfer> fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
{
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList()));
await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false);
_downloadStatus[downloadGroup].DownloadStatus = DownloadStatus.Downloading;
HttpResponseMessage response = null!;
var requestUrl = MareFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId);
Logger.LogDebug("Downloading {requestUrl} for request {id}", requestUrl, requestId);
try
{
response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
}
catch (HttpRequestException ex)
{
Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode);
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
{
throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
}
}
ThrottledStream? stream = null;
try
{
var fileStream = File.Create(tempPath);
await using (fileStream.ConfigureAwait(false))
{
var bufferSize = response.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196;
var buffer = new byte[bufferSize];
var bytesRead = 0;
var limit = _orchestrator.DownloadLimitPerSlot();
Logger.LogTrace("Starting Download of {id} with a speed limit of {limit} to {tempPath}", requestId, limit, tempPath);
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
_activeDownloadStreams.Add(stream);
while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0)
{
ct.ThrowIfCancellationRequested();
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
progress.Report(bytesRead);
}
Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath);
}
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
try
{
if (!tempPath.IsNullOrEmpty())
File.Delete(tempPath);
}
catch
{
// ignore if file deletion fails
}
throw;
}
finally
{
if (stream != null)
{
_activeDownloadStreams.Remove(stream);
await stream.DisposeAsync().ConfigureAwait(false);
}
}
}
public async Task<List<DownloadFileTransfer>> InitiateDownloadList(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
{
Logger.LogDebug("Download start: {id}", gameObjectHandler.Name);
List<DownloadFileDto> downloadFileInfoFromService =
[
.. await FilesGetSizes(fileReplacement.Select(f => f.Hash).Distinct(StringComparer.Ordinal).ToList(), ct).ConfigureAwait(false),
];
Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
{
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
{
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
}
}
CurrentDownloads = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d))
.Where(d => d.CanBeTransferred).ToList();
return CurrentDownloads;
}
private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
{
var downloadGroups = CurrentDownloads.GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal);
foreach (var downloadGroup in downloadGroups)
{
_downloadStatus[downloadGroup.Key] = new FileDownloadStatus()
{
DownloadStatus = DownloadStatus.Initializing,
TotalBytes = downloadGroup.Sum(c => c.Total),
TotalFiles = 1,
TransferredBytes = 0,
TransferredFiles = 0
};
}
Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
await Parallel.ForEachAsync(downloadGroups, new ParallelOptions()
{
MaxDegreeOfParallelism = downloadGroups.Count(),
CancellationToken = ct,
},
async (fileGroup, token) =>
{
// let server predownload files
var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri),
fileGroup.Select(c => c.Hash), token).ConfigureAwait(false);
Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri,
await requestIdResponse.Content.ReadAsStringAsync(token).ConfigureAwait(false));
Guid requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"'));
Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
FileInfo fi = new(blockFile);
try
{
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue;
Progress<long> progress = new((bytesDownloaded) =>
{
try
{
if (!_downloadStatus.TryGetValue(fileGroup.Key, out FileDownloadStatus? value)) return;
value.TransferredBytes += bytesDownloaded;
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Could not set download progress");
}
});
await DownloadAndMungeFileHttpClient(fileGroup.Key, requestId, [.. fileGroup], blockFile, progress, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, gameObjectHandler);
}
catch (Exception ex)
{
_orchestrator.ReleaseDownloadSlot();
File.Delete(blockFile);
Logger.LogError(ex, "{dlName}: Error during download of {id}", fi.Name, requestId);
ClearDownload();
return;
}
FileStream? fileBlockStream = null;
var threadCount = Math.Clamp((int)(Environment.ProcessorCount / 2.0f), 2, 8);
var tasks = new List<Task>();
try
{
if (_downloadStatus.TryGetValue(fileGroup.Key, out var status))
{
status.TransferredFiles = 1;
status.DownloadStatus = DownloadStatus.Decompressing;
}
fileBlockStream = File.OpenRead(blockFile);
while (fileBlockStream.Position < fileBlockStream.Length)
{
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
var chunkPosition = fileBlockStream.Position;
fileBlockStream.Position += fileLengthBytes;
while (tasks.Count > threadCount && tasks.Where(t => !t.IsCompleted).Count() > 4)
await Task.Delay(10, CancellationToken.None);
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
var tmpPath = _fileDbManager.GetCacheFilePath(Guid.NewGuid().ToString(), "tmp");
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", fi.Name, fileHash, fileLengthBytes, filePath);
tasks.Add(Task.Run(() => {
try
{
using var tmpFileStream = new HashingStream(new FileStream(tmpPath, new FileStreamOptions()
{
Mode = FileMode.CreateNew,
Access = FileAccess.Write,
Share = FileShare.None
}), SHA1.Create());
using var fileChunkStream = new FileStream(blockFile, new FileStreamOptions()
{
BufferSize = 80000,
Mode = FileMode.Open,
Access = FileAccess.Read
});
fileChunkStream.Position = chunkPosition;
using var innerFileStream = new LimitedStream(fileChunkStream, fileLengthBytes);
using var decoder = LZ4Frame.Decode(innerFileStream);
long startPos = fileChunkStream.Position;
decoder.AsStream().CopyTo(tmpFileStream);
long readBytes = fileChunkStream.Position - startPos;
if (readBytes != fileLengthBytes)
{
throw new EndOfStreamException();
}
string calculatedHash = BitConverter.ToString(tmpFileStream.Finish()).Replace("-", "", StringComparison.Ordinal);
if (calculatedHash != fileHash)
{
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", calculatedHash, fileHash);
return;
}
tmpFileStream.Close();
_fileCompactor.RenameAndCompact(filePath, tmpPath);
PersistFileToStorage(fileHash, filePath, fileLengthBytes);
}
catch (EndOfStreamException)
{
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", fi.Name, fileHash);
}
catch (Exception e)
{
Logger.LogWarning(e, "{dlName}: Error during decompression of {hash}", fi.Name, fileHash);
foreach (var fr in fileReplacement)
Logger.LogWarning(" - {h}: {x}", fr.Hash, fr.GamePaths[0]);
}
finally
{
if (File.Exists(tmpPath))
File.Delete(tmpPath);
}
}));
}
Task.WaitAll([..tasks], CancellationToken.None);
}
catch (EndOfStreamException)
{
Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", fi.Name);
}
catch (Exception ex)
{
Logger.LogError(ex, "{dlName}: Error during block file read", fi.Name);
}
finally
{
Task.WaitAll([..tasks], CancellationToken.None);
_orchestrator.ReleaseDownloadSlot();
if (fileBlockStream != null)
await fileBlockStream.DisposeAsync().ConfigureAwait(false);
File.Delete(blockFile);
}
}).ConfigureAwait(false);
Logger.LogDebug("Download end: {id}", gameObjectHandler);
ClearDownload();
}
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)
{
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false);
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
}
private void PersistFileToStorage(string fileHash, string filePath, long? compressedSize = null)
{
try
{
var entry = _fileDbManager.CreateCacheEntry(filePath, fileHash);
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
{
_fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath);
entry = null;
}
if (entry != null)
entry.CompressedSize = compressedSize;
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Error creating cache entry");
}
}
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
{
bool alreadyCancelled = false;
try
{
CancellationTokenSource localTimeoutCts = new();
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
CancellationTokenSource composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
while (!_orchestrator.IsDownloadReady(requestId))
{
try
{
await Task.Delay(250, composite.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
if (downloadCt.IsCancellationRequested) throw;
var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false);
req.EnsureSuccessStatusCode();
localTimeoutCts.Dispose();
composite.Dispose();
localTimeoutCts = new();
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
}
}
localTimeoutCts.Dispose();
composite.Dispose();
Logger.LogDebug("Download {requestId} ready", requestId);
}
catch (TaskCanceledException)
{
try
{
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
alreadyCancelled = true;
}
catch
{
// ignore whatever happens here
}
throw;
}
finally
{
if (downloadCt.IsCancellationRequested && !alreadyCancelled)
{
try
{
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
}
catch
{
// ignore whatever happens here
}
}
_orchestrator.ClearDownloadRequest(requestId);
}
}
}