Files
ClubPenguinClient/MareSynchronos/WebAPI/Files/FileDownloadManager.cs
rootdarkarchon 324288652d fix some shit, add triangle count
check for invalid animations

I hate animations

ignore broken bones from god knows what

fix more idiotic mod things

fully ignore garbage skeletons that fail to process properly

fix my own mistakes

fix more bullshit

check for filename length and continue

idk some cleanup

fix spoopy skellingtons

change loglevel of tris
2025-02-16 04:21:02 +00:00

448 lines
18 KiB
C#

using Dalamud.Utility;
using K4os.Compression.LZ4.Streams;
using MareSynchronos.API.Data;
using MareSynchronos.API.Dto.Files;
using MareSynchronos.API.Routes;
using MareSynchronos.FileCache;
using MareSynchronos.PlayerData.Handlers;
using MareSynchronos.Services.Mediator;
using MareSynchronos.Utils;
using MareSynchronos.WebAPI.Files.Models;
using Microsoft.Extensions.Logging;
using System.Net;
using System.Net.Http.Json;
namespace MareSynchronos.WebAPI.Files;
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
{
private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
private readonly FileCompactor _fileCompactor;
private readonly FileCacheManager _fileDbManager;
private readonly FileTransferOrchestrator _orchestrator;
private readonly List<ThrottledStream> _activeDownloadStreams;
public FileDownloadManager(ILogger<FileDownloadManager> logger, MareMediator mediator,
FileTransferOrchestrator orchestrator,
FileCacheManager fileCacheManager, FileCompactor fileCompactor) : base(logger, mediator)
{
_downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
_orchestrator = orchestrator;
_fileDbManager = fileCacheManager;
_fileCompactor = fileCompactor;
_activeDownloadStreams = [];
Mediator.Subscribe<DownloadLimitChangedMessage>(this, (msg) =>
{
if (!_activeDownloadStreams.Any()) return;
var newLimit = _orchestrator.DownloadLimitPerSlot();
Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
foreach (var stream in _activeDownloadStreams)
{
stream.BandwidthLimit = newLimit;
}
});
}
public List<DownloadFileTransfer> CurrentDownloads { get; private set; } = [];
public List<FileTransfer> ForbiddenTransfers => _orchestrator.ForbiddenTransfers;
public bool IsDownloading => !CurrentDownloads.Any();
public void CancelDownload()
{
CurrentDownloads.Clear();
_downloadStatus.Clear();
}
public async Task DownloadFiles(GameObjectHandler gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct)
{
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
try
{
await DownloadFilesInternal(gameObject, fileReplacementDto, ct).ConfigureAwait(false);
}
catch
{
CancelDownload();
}
finally
{
Mediator.Publish(new DownloadFinishedMessage(gameObject));
Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
}
}
protected override void Dispose(bool disposing)
{
CancelDownload();
foreach (var stream in _activeDownloadStreams)
{
try
{
stream.Dispose();
}
catch
{
// do nothing
//
}
}
base.Dispose(disposing);
}
private static byte ConvertReadByte(int byteOrEof)
{
if (byteOrEof == -1)
{
throw new EndOfStreamException();
}
return (byte)byteOrEof;
}
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
{
List<char> hashName = [];
List<char> fileLength = [];
var separator = (char)ConvertReadByte(fileBlockStream.ReadByte());
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
bool readHash = false;
while (true)
{
var readChar = (char)ConvertReadByte(fileBlockStream.ReadByte());
if (readChar == ':')
{
readHash = true;
continue;
}
if (readChar == '#') break;
if (!readHash) hashName.Add(readChar);
else fileLength.Add(readChar);
}
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
}
private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List<DownloadFileTransfer> fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
{
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList()));
await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false);
_downloadStatus[downloadGroup].DownloadStatus = DownloadStatus.Downloading;
HttpResponseMessage response = null!;
var requestUrl = MareFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId);
Logger.LogDebug("Downloading {requestUrl} for request {id}", requestUrl, requestId);
try
{
response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
}
catch (HttpRequestException ex)
{
Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode);
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
{
throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
}
}
ThrottledStream? stream = null;
try
{
var fileStream = File.Create(tempPath);
await using (fileStream.ConfigureAwait(false))
{
var bufferSize = response.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196;
var buffer = new byte[bufferSize];
var bytesRead = 0;
var limit = _orchestrator.DownloadLimitPerSlot();
Logger.LogTrace("Starting Download of {id} with a speed limit of {limit}", requestId, limit);
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
_activeDownloadStreams.Add(stream);
while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0)
{
ct.ThrowIfCancellationRequested();
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
progress.Report(bytesRead);
}
Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath);
}
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Error during file download of {requestUrl}", requestUrl);
try
{
if (!tempPath.IsNullOrEmpty())
File.Delete(tempPath);
}
catch
{
// ignore if file deletion fails
}
throw;
}
finally
{
if (stream != null)
{
_activeDownloadStreams.Remove(stream);
await stream.DisposeAsync().ConfigureAwait(false);
}
}
}
private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
{
Logger.LogDebug("Download start: {id}", gameObjectHandler.Name);
List<DownloadFileDto> downloadFileInfoFromService =
[
.. await FilesGetSizes(fileReplacement.Select(f => f.Hash).Distinct(StringComparer.Ordinal).ToList(), ct).ConfigureAwait(false),
];
Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
CurrentDownloads = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d))
.Where(d => d.CanBeTransferred).ToList();
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
{
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
{
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
}
}
var downloadGroups = CurrentDownloads.Where(f => f.CanBeTransferred).GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal);
foreach (var downloadGroup in downloadGroups)
{
_downloadStatus[downloadGroup.Key] = new FileDownloadStatus()
{
DownloadStatus = DownloadStatus.Initializing,
TotalBytes = downloadGroup.Sum(c => c.Total),
TotalFiles = 1,
TransferredBytes = 0,
TransferredFiles = 0
};
}
Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
await Parallel.ForEachAsync(downloadGroups, new ParallelOptions()
{
MaxDegreeOfParallelism = downloadGroups.Count(),
CancellationToken = ct,
},
async (fileGroup, token) =>
{
// let server predownload files
var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri),
fileGroup.Select(c => c.Hash), token).ConfigureAwait(false);
Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri,
await requestIdResponse.Content.ReadAsStringAsync(token).ConfigureAwait(false));
Guid requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"'));
Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
try
{
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue;
Progress<long> progress = new((bytesDownloaded) =>
{
try
{
if (!_downloadStatus.TryGetValue(fileGroup.Key, out FileDownloadStatus? value)) return;
value.TransferredBytes += bytesDownloaded;
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Could not set download progress");
}
});
await DownloadAndMungeFileHttpClient(fileGroup.Key, requestId, [.. fileGroup], blockFile, progress, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_orchestrator.ReleaseDownloadSlot();
File.Delete(blockFile);
Logger.LogDebug("Detected cancellation, removing {id}", gameObjectHandler);
CancelDownload();
return;
}
catch (Exception ex)
{
_orchestrator.ReleaseDownloadSlot();
File.Delete(blockFile);
Logger.LogError(ex, "Error during download of {id}", requestId);
CancelDownload();
return;
}
FileStream? fileBlockStream = null;
try
{
_downloadStatus[fileGroup.Key].TransferredFiles = 1;
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.Decompressing;
fileBlockStream = File.OpenRead(blockFile);
while (fileBlockStream.Position < fileBlockStream.Length)
{
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
try
{
Logger.LogDebug("Found file {file} with length {le}, decompressing download", fileHash, fileLengthBytes);
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
using var decompressedFile = new MemoryStream(64 * 1024);
using var innerFileStream = new LimitedStream(fileBlockStream, fileLengthBytes);
innerFileStream.DisposeUnderlying = false;
using var decStream = LZ4Stream.Decode(innerFileStream, 0, true);
await decStream.CopyToAsync(decompressedFile, token);
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile.ToArray(), token).ConfigureAwait(false);
PersistFileToStorage(fileHash, filePath);
}
catch (Exception e)
{
Logger.LogWarning(e, "Error during decompression");
}
}
}
catch (Exception ex)
{
Logger.LogError(ex, "Error during block file read");
}
finally
{
_orchestrator.ReleaseDownloadSlot();
if (fileBlockStream != null)
await fileBlockStream.DisposeAsync().ConfigureAwait(false);
File.Delete(blockFile);
}
}).ConfigureAwait(false);
Logger.LogDebug("Download end: {id}", gameObjectHandler);
CancelDownload();
}
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)
{
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false);
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
}
private void PersistFileToStorage(string fileHash, string filePath)
{
var fi = new FileInfo(filePath);
Func<DateTime> RandomDayInThePast()
{
DateTime start = new(1995, 1, 1, 1, 1, 1, DateTimeKind.Local);
Random gen = new();
int range = (DateTime.Today - start).Days;
return () => start.AddDays(gen.Next(range));
}
fi.CreationTime = RandomDayInThePast().Invoke();
fi.LastAccessTime = DateTime.Today;
fi.LastWriteTime = RandomDayInThePast().Invoke();
try
{
var entry = _fileDbManager.CreateCacheEntry(filePath);
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
{
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry.Hash, fileHash);
File.Delete(filePath);
_fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath);
}
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Error creating cache entry");
}
}
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
{
bool alreadyCancelled = false;
try
{
CancellationTokenSource localTimeoutCts = new();
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
CancellationTokenSource composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
while (!_orchestrator.IsDownloadReady(requestId))
{
try
{
await Task.Delay(250, composite.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
if (downloadCt.IsCancellationRequested) throw;
var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false);
req.EnsureSuccessStatusCode();
localTimeoutCts.Dispose();
composite.Dispose();
localTimeoutCts = new();
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
}
}
localTimeoutCts.Dispose();
composite.Dispose();
Logger.LogDebug("Download {requestId} ready", requestId);
}
catch (TaskCanceledException)
{
try
{
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
alreadyCancelled = true;
}
catch
{
// ignore whatever happens here
}
throw;
}
finally
{
if (downloadCt.IsCancellationRequested && !alreadyCancelled)
{
try
{
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
}
catch
{
// ignore whatever happens here
}
}
_orchestrator.ClearDownloadRequest(requestId);
}
}
}