extract partial downloads

This commit is contained in:
Stanley Dimant
2025-02-21 21:29:54 +01:00
committed by Loporrit
parent 57fbf2b983
commit 2ae94b1357
3 changed files with 66 additions and 34 deletions

View File

@@ -34,8 +34,9 @@ internal sealed class DalamudLogger : ILogger
else else
{ {
StringBuilder sb = new(); StringBuilder sb = new();
sb.AppendLine($"[{_name}]{{{(int)logLevel}}} {state}: {exception?.Message}"); sb.Append($"[{_name}]{{{(int)logLevel}}} {state}: {exception?.Message}");
sb.AppendLine(exception?.StackTrace); if (!string.IsNullOrWhiteSpace(exception?.StackTrace))
sb.AppendLine(exception?.StackTrace);
var innerException = exception?.InnerException; var innerException = exception?.InnerException;
while (innerException != null) while (innerException != null)
{ {

View File

@@ -249,7 +249,6 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase
if (!disposing) return; if (!disposing) return;
SetUploading(isUploading: false); SetUploading(isUploading: false);
_downloadManager.Dispose();
var name = PlayerName; var name = PlayerName;
Logger.LogDebug("Disposing {name} ({user})", name, Pair); Logger.LogDebug("Disposing {name} ({user})", name, Pair);
try try
@@ -319,7 +318,7 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase
} }
else else
{ {
var cts = new CancellationTokenSource(); using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(60)); cts.CancelAfter(TimeSpan.FromSeconds(60));
Logger.LogInformation("[{applicationId}] CachedData is null {isNull}, contains things: {contains}", applicationId, _cachedData == null, _cachedData?.FileReplacements.Any() ?? false); Logger.LogInformation("[{applicationId}] CachedData is null {isNull}, contains things: {contains}", applicationId, _cachedData == null, _cachedData?.FileReplacements.Any() ?? false);
@@ -336,8 +335,6 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase
break; break;
} }
} }
cts.CancelDispose();
} }
} }
} }
@@ -442,6 +439,8 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase
}); });
} }
private Task? _pairDownloadTask;
private async Task DownloadAndApplyCharacterAsync(Guid applicationBase, CharacterData charaData, Dictionary<ObjectKind, HashSet<PlayerChanges>> updatedData, private async Task DownloadAndApplyCharacterAsync(Guid applicationBase, CharacterData charaData, Dictionary<ObjectKind, HashSet<PlayerChanges>> updatedData,
bool updateModdedPaths, bool updateManip, CancellationToken downloadToken) bool updateModdedPaths, bool updateManip, CancellationToken downloadToken)
{ {
@@ -456,7 +455,12 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase
while (toDownloadReplacements.Count > 0 && attempts++ <= 10 && !downloadToken.IsCancellationRequested) while (toDownloadReplacements.Count > 0 && attempts++ <= 10 && !downloadToken.IsCancellationRequested)
{ {
_downloadManager.CancelDownload(); if (_pairDownloadTask != null && !_pairDownloadTask.IsCompleted)
{
Logger.LogDebug("[BASE-{appBase}] Finishing prior running download task for player {name}, {kind}", applicationBase, PlayerName, updatedData);
await _pairDownloadTask.ConfigureAwait(false);
}
Logger.LogDebug("[BASE-{appBase}] Downloading missing files for player {name}, {kind}", applicationBase, PlayerName, updatedData); Logger.LogDebug("[BASE-{appBase}] Downloading missing files for player {name}, {kind}", applicationBase, PlayerName, updatedData);
Mediator.Publish(new EventMessage(new Event(PlayerName, Pair.UserData, nameof(PairHandler), EventSeverity.Informational, Mediator.Publish(new EventMessage(new Event(PlayerName, Pair.UserData, nameof(PairHandler), EventSeverity.Informational,
@@ -466,17 +470,17 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase
if (!_playerPerformanceService.ComputeAndAutoPauseOnVRAMUsageThresholds(this, charaData, toDownloadFiles)) if (!_playerPerformanceService.ComputeAndAutoPauseOnVRAMUsageThresholds(this, charaData, toDownloadFiles))
{ {
Pair.HoldApplication("IndividualPerformanceThreshold", maxValue: 1); Pair.HoldApplication("IndividualPerformanceThreshold", maxValue: 1);
_downloadManager.CancelDownload(); _downloadManager.ClearDownload();
return; return;
} }
await _downloadManager.DownloadFiles(_charaHandler!, toDownloadReplacements, downloadToken).ConfigureAwait(false); _pairDownloadTask = Task.Run(async () => await _downloadManager.DownloadFiles(_charaHandler!, toDownloadReplacements, downloadToken).ConfigureAwait(false));
_downloadManager.CancelDownload();
await _pairDownloadTask.ConfigureAwait(false);
if (downloadToken.IsCancellationRequested) if (downloadToken.IsCancellationRequested)
{ {
Logger.LogTrace("[BASE-{appBase}] Detected cancellation", applicationBase); Logger.LogTrace("[BASE-{appBase}] Detected cancellation", applicationBase);
_downloadManager.CancelDownload();
return; return;
} }

View File

@@ -50,7 +50,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
public bool IsDownloading => !CurrentDownloads.Any(); public bool IsDownloading => !CurrentDownloads.Any();
public void CancelDownload() public void ClearDownload()
{ {
CurrentDownloads.Clear(); CurrentDownloads.Clear();
_downloadStatus.Clear(); _downloadStatus.Clear();
@@ -65,7 +65,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
} }
catch catch
{ {
CancelDownload(); ClearDownload();
} }
finally finally
{ {
@@ -76,8 +76,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
protected override void Dispose(bool disposing) protected override void Dispose(bool disposing)
{ {
CancelDownload(); ClearDownload();
foreach (var stream in _activeDownloadStreams) foreach (var stream in _activeDownloadStreams.ToList())
{ {
try try
{ {
@@ -112,7 +112,11 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
bool readHash = false; bool readHash = false;
while (true) while (true)
{ {
var readChar = (char)ConvertReadByte(fileBlockStream.ReadByte()); int readByte = fileBlockStream.ReadByte();
if (readByte == -1)
throw new EndOfStreamException();
var readChar = (char)ConvertReadByte(readByte);
if (readChar == ':') if (readChar == ':')
{ {
readHash = true; readHash = true;
@@ -122,6 +126,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
if (!readHash) hashName.Add(readChar); if (!readHash) hashName.Add(readChar);
else fileLength.Add(readChar); else fileLength.Add(readChar);
} }
if (fileLength.Count == 0)
fileLength.Add('0');
return (string.Join("", hashName), long.Parse(string.Join("", fileLength))); return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
} }
@@ -162,7 +168,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
var bytesRead = 0; var bytesRead = 0;
var limit = _orchestrator.DownloadLimitPerSlot(); var limit = _orchestrator.DownloadLimitPerSlot();
Logger.LogTrace("Starting Download of {id} with a speed limit of {limit}", requestId, limit); Logger.LogTrace("Starting Download of {id} with a speed limit of {limit} to {tempPath}", requestId, limit, tempPath);
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit); stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
_activeDownloadStreams.Add(stream); _activeDownloadStreams.Add(stream);
while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0) while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0)
@@ -177,9 +183,12 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath); Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath);
} }
} }
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogWarning(ex, "Error during file download of {requestUrl}", requestUrl);
try try
{ {
if (!tempPath.IsNullOrEmpty()) if (!tempPath.IsNullOrEmpty())
@@ -262,6 +271,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri); Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
FileInfo fi = new(blockFile);
try try
{ {
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot; _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
@@ -283,26 +293,25 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
_orchestrator.ReleaseDownloadSlot(); Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, gameObjectHandler);
File.Delete(blockFile);
Logger.LogDebug("Detected cancellation, removing {id}", gameObjectHandler);
CancelDownload();
return;
} }
catch (Exception ex) catch (Exception ex)
{ {
_orchestrator.ReleaseDownloadSlot(); _orchestrator.ReleaseDownloadSlot();
File.Delete(blockFile); File.Delete(blockFile);
Logger.LogError(ex, "Error during download of {id}", requestId); Logger.LogError(ex, "{dlName}: Error during download of {id}", fi.Name, requestId);
CancelDownload(); ClearDownload();
return; return;
} }
FileStream? fileBlockStream = null; FileStream? fileBlockStream = null;
try try
{ {
_downloadStatus[fileGroup.Key].TransferredFiles = 1; if (_downloadStatus.TryGetValue(fileGroup.Key, out var status))
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.Decompressing; {
status.TransferredFiles = 1;
status.DownloadStatus = DownloadStatus.Decompressing;
}
fileBlockStream = File.OpenRead(blockFile); fileBlockStream = File.OpenRead(blockFile);
while (fileBlockStream.Position < fileBlockStream.Length) while (fileBlockStream.Position < fileBlockStream.Length)
{ {
@@ -310,29 +319,47 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
try try
{ {
Logger.LogDebug("Found file {file} with length {le}, decompressing download", fileHash, fileLengthBytes);
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1]; var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", fi.Name, fileHash, fileLengthBytes, filePath);
using var decompressedFile = new MemoryStream(64 * 1024); using var decompressedFile = new MemoryStream(64 * 1024);
using var innerFileStream = new LimitedStream(fileBlockStream, fileLengthBytes); using var innerFileStream = new LimitedStream(fileBlockStream, fileLengthBytes);
innerFileStream.DisposeUnderlying = false; innerFileStream.DisposeUnderlying = false;
using var decStream = LZ4Stream.Decode(innerFileStream, 0, true); using var decStream = LZ4Stream.Decode(innerFileStream, 0, true);
await decStream.CopyToAsync(decompressedFile, token); long startPos = fileBlockStream.Position;
await decStream.CopyToAsync(decompressedFile, CancellationToken.None).ConfigureAwait(false);
long readBytes = fileBlockStream.Position - startPos;
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension); if (readBytes != fileLengthBytes)
await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile.ToArray(), token).ConfigureAwait(false); {
throw new EndOfStreamException();
}
await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile.ToArray(), CancellationToken.None).ConfigureAwait(false);
PersistFileToStorage(fileHash, filePath, fileLengthBytes); PersistFileToStorage(fileHash, filePath, fileLengthBytes);
} }
catch (EndOfStreamException)
{
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", fi.Name, fileHash);
}
catch (Exception e) catch (Exception e)
{ {
Logger.LogWarning(e, "Error during decompression"); Logger.LogWarning(e, "{dlName}: Error during decompression of {hash}", fi.Name, fileHash);
foreach (var fr in fileReplacement)
Logger.LogWarning(" - {h}: {x}", fr.Hash, fr.GamePaths[0]);
} }
} }
} }
catch (EndOfStreamException)
{
Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", fi.Name);
}
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, "Error during block file read"); Logger.LogError(ex, "{dlName}: Error during block file read", fi.Name);
} }
finally finally
{ {
@@ -345,7 +372,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogDebug("Download end: {id}", gameObjectHandler); Logger.LogDebug("Download end: {id}", gameObjectHandler);
CancelDownload(); ClearDownload();
} }
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct) private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)