Single file transfers (#56)
* move to single file transfer and extraction per server download * clean up downloads --------- Co-authored-by: rootdarkarchon <root.darkarchon@outlook.com>
This commit is contained in:
2
MareAPI
2
MareAPI
Submodule MareAPI updated: a5373bca24...4aacbb78bb
@@ -63,7 +63,7 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase
|
||||
_scanCancellationTokenSource?.Cancel();
|
||||
_scanCancellationTokenSource = new CancellationTokenSource();
|
||||
var token = _scanCancellationTokenSource.Token;
|
||||
Task.Run(async () =>
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
|
||||
@@ -59,6 +59,7 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase
|
||||
Mediator.Subscribe<FrameworkUpdateMessage>(this, (_) => FrameworkUpdate());
|
||||
Mediator.Subscribe<ZoneSwitchStartMessage>(this, (_) =>
|
||||
{
|
||||
_downloadCancellationTokenSource?.CancelDispose();
|
||||
MediatorUnsubscribeFromCharacterChanged();
|
||||
_charaHandler?.Invalidate();
|
||||
IsVisible = false;
|
||||
|
||||
@@ -77,20 +77,41 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
private async Task DownloadFileHttpClient(string downloadGroup, DownloadFileTransfer fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
|
||||
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
|
||||
{
|
||||
var requestId = await GetQueueRequest(fileTransfer, ct).ConfigureAwait(false);
|
||||
List<char> hashName = new();
|
||||
List<char> fileLength = new();
|
||||
var separator = (char)fileBlockStream.ReadByte();
|
||||
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
|
||||
|
||||
Logger.LogDebug("GUID {requestId} for file {hash} on server {uri}", requestId, fileTransfer.Hash, fileTransfer.DownloadUri);
|
||||
bool readHash = false;
|
||||
while (true)
|
||||
{
|
||||
var readChar = (char)fileBlockStream.ReadByte();
|
||||
if (readChar == ':')
|
||||
{
|
||||
readHash = true;
|
||||
continue;
|
||||
}
|
||||
if (readChar == '#') break;
|
||||
if (!readHash) hashName.Add(readChar);
|
||||
else fileLength.Add(readChar);
|
||||
}
|
||||
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
|
||||
}
|
||||
|
||||
private async Task DownloadFileHttpClient(string downloadGroup, Guid requestId, List<DownloadFileTransfer> fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
|
||||
{
|
||||
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList()));
|
||||
|
||||
await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false);
|
||||
|
||||
_downloadStatus[downloadGroup].DownloadStatus = DownloadStatus.Downloading;
|
||||
|
||||
HttpResponseMessage response = null!;
|
||||
var requestUrl = MareFiles.CacheGetFullPath(fileTransfer.DownloadUri, requestId);
|
||||
var requestUrl = MareFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId);
|
||||
|
||||
Logger.LogDebug("Downloading {requestUrl} for file {hash}", requestUrl, fileTransfer.Hash);
|
||||
Logger.LogDebug("Downloading {requestUrl} for request {id}", requestUrl, requestId);
|
||||
try
|
||||
{
|
||||
response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct).ConfigureAwait(false);
|
||||
@@ -144,7 +165,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
|
||||
{
|
||||
Logger.LogDebug("Downloading files for {id}", gameObjectHandler.Name);
|
||||
Logger.LogDebug("Download start: {id}", gameObjectHandler.Name);
|
||||
|
||||
List<DownloadFileDto> downloadFileInfoFromService = new();
|
||||
downloadFileInfoFromService.AddRange(await FilesGetSizes(fileReplacement.Select(f => f.Hash).ToList(), ct).ConfigureAwait(false));
|
||||
@@ -156,7 +177,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
||||
{
|
||||
if (!_orchestrator.ForbiddenTransfers.Any(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
||||
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
||||
{
|
||||
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
||||
}
|
||||
@@ -170,7 +191,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
{
|
||||
DownloadStatus = DownloadStatus.Initializing,
|
||||
TotalBytes = downloadGroup.Sum(c => c.Total),
|
||||
TotalFiles = downloadGroup.Count(),
|
||||
TotalFiles = 1,
|
||||
TransferredBytes = 0,
|
||||
TransferredFiles = 0
|
||||
};
|
||||
@@ -186,88 +207,96 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
async (fileGroup, token) =>
|
||||
{
|
||||
// let server predownload files
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri),
|
||||
var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri),
|
||||
fileGroup.Select(c => c.Hash), token).ConfigureAwait(false);
|
||||
Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri, requestIdResponse.Content.ReadAsStringAsync().Result);
|
||||
|
||||
foreach (var file in fileGroup)
|
||||
Guid requestId = Guid.Parse(requestIdResponse.Content.ReadAsStringAsync().Result.Trim('"'));
|
||||
_downloadReady[requestId] = false;
|
||||
|
||||
Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
|
||||
|
||||
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk", true);
|
||||
try
|
||||
{
|
||||
var ext = fileReplacement.First(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase)).GamePaths.First().Split(".").Last();
|
||||
var tempPath = _fileDbManager.GetCacheFilePath(file.Hash, ext, isTemporaryFile: true);
|
||||
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
|
||||
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
|
||||
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue;
|
||||
Progress<long> progress = new((bytesDownloaded) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!_downloadStatus.ContainsKey(fileGroup.Key)) return;
|
||||
_downloadStatus[fileGroup.Key].TransferredBytes += bytesDownloaded;
|
||||
file.Transferred += bytesDownloaded;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Could not set download progress");
|
||||
}
|
||||
});
|
||||
await DownloadFileHttpClient(fileGroup.Key, requestId, fileGroup.ToList(), blockFile, progress, token).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
_orchestrator.ReleaseDownloadSlot();
|
||||
File.Delete(blockFile);
|
||||
Logger.LogDebug("Detected cancellation, removing {id}", gameObjectHandler);
|
||||
CancelDownload();
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_orchestrator.ReleaseDownloadSlot();
|
||||
File.Delete(blockFile);
|
||||
Logger.LogError(ex, "Error during download of {id}", requestId);
|
||||
CancelDownload();
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
|
||||
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
|
||||
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue;
|
||||
await DownloadFileHttpClient(fileGroup.Key, file, tempPath, progress, token).ConfigureAwait(false);
|
||||
_downloadStatus[fileGroup.Key].TransferredFiles += 1;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
File.Delete(tempPath);
|
||||
Logger.LogDebug("Detected cancellation, removing {id}", gameObjectHandler);
|
||||
CancelDownload();
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError(ex, "Error during download of {hash}", file.Hash);
|
||||
continue;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_orchestrator.ReleaseDownloadSlot();
|
||||
}
|
||||
|
||||
FileStream? fileBlockStream = null;
|
||||
try
|
||||
{
|
||||
_downloadStatus[fileGroup.Key].TransferredFiles = 1;
|
||||
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.Decompressing;
|
||||
var tempFileData = await File.ReadAllBytesAsync(tempPath, token).ConfigureAwait(false);
|
||||
var extractedFile = LZ4Codec.Unwrap(tempFileData);
|
||||
File.Delete(tempPath);
|
||||
var filePath = _fileDbManager.GetCacheFilePath(file.Hash, ext, isTemporaryFile: false);
|
||||
await File.WriteAllBytesAsync(filePath, extractedFile, token).ConfigureAwait(false);
|
||||
var fi = new FileInfo(filePath);
|
||||
Func<DateTime> RandomDayInThePast()
|
||||
fileBlockStream = File.OpenRead(blockFile);
|
||||
while (fileBlockStream.Position < fileBlockStream.Length)
|
||||
{
|
||||
DateTime start = new(1995, 1, 1);
|
||||
Random gen = new();
|
||||
int range = (DateTime.Today - start).Days;
|
||||
return () => start.AddDays(gen.Next(range));
|
||||
}
|
||||
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
|
||||
|
||||
fi.CreationTime = RandomDayInThePast().Invoke();
|
||||
fi.LastAccessTime = DateTime.Today;
|
||||
fi.LastWriteTime = RandomDayInThePast().Invoke();
|
||||
try
|
||||
{
|
||||
var entry = _fileDbManager.CreateCacheEntry(filePath);
|
||||
if (!string.Equals(entry?.Hash, file.Hash, StringComparison.OrdinalIgnoreCase))
|
||||
try
|
||||
{
|
||||
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry?.Hash, file.Hash);
|
||||
File.Delete(filePath);
|
||||
_fileDbManager.RemoveHashedFile(entry);
|
||||
Logger.LogDebug("Found file {file} with length {le}, decompressing download", fileHash, fileLengthBytes);
|
||||
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".").Last();
|
||||
|
||||
byte[] compressedFileContent = new byte[fileLengthBytes];
|
||||
_ = await fileBlockStream.ReadAsync(compressedFileContent, token).ConfigureAwait(false);
|
||||
|
||||
var decompressedFile = LZ4Codec.Unwrap(compressedFileContent);
|
||||
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension, false);
|
||||
await File.WriteAllBytesAsync(filePath, decompressedFile, token).ConfigureAwait(false);
|
||||
|
||||
PersistFileToStorage(fileHash, filePath);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Logger.LogWarning(e, "Error during decompression");
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Issue creating cache entry");
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError(ex, "Error during block file read");
|
||||
}
|
||||
finally
|
||||
{
|
||||
_orchestrator.ReleaseDownloadSlot();
|
||||
fileBlockStream?.Dispose();
|
||||
File.Delete(blockFile);
|
||||
}
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
Logger.LogDebug("Download for {id} complete", gameObjectHandler);
|
||||
Logger.LogDebug("Download end: {id}", gameObjectHandler);
|
||||
|
||||
CancelDownload();
|
||||
}
|
||||
|
||||
@@ -278,19 +307,37 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? new List<DownloadFileDto>();
|
||||
}
|
||||
|
||||
private async Task<Guid> GetQueueRequest(DownloadFileTransfer downloadFileTransfer, CancellationToken ct)
|
||||
private void PersistFileToStorage(string fileHash, string filePath)
|
||||
{
|
||||
var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestRequestFileFullPath(downloadFileTransfer.DownloadUri, downloadFileTransfer.Hash), ct).ConfigureAwait(false);
|
||||
var responseString = await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false);
|
||||
var requestId = Guid.Parse(responseString.Trim('"'));
|
||||
if (!_downloadReady.ContainsKey(requestId))
|
||||
var fi = new FileInfo(filePath);
|
||||
Func<DateTime> RandomDayInThePast()
|
||||
{
|
||||
_downloadReady[requestId] = false;
|
||||
DateTime start = new(1995, 1, 1);
|
||||
Random gen = new();
|
||||
int range = (DateTime.Today - start).Days;
|
||||
return () => start.AddDays(gen.Next(range));
|
||||
}
|
||||
|
||||
fi.CreationTime = RandomDayInThePast().Invoke();
|
||||
fi.LastAccessTime = DateTime.Today;
|
||||
fi.LastWriteTime = RandomDayInThePast().Invoke();
|
||||
try
|
||||
{
|
||||
var entry = _fileDbManager.CreateCacheEntry(filePath);
|
||||
if (!string.Equals(entry?.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry?.Hash, fileHash);
|
||||
File.Delete(filePath);
|
||||
_fileDbManager.RemoveHashedFile(entry);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Error creating cache entry");
|
||||
}
|
||||
return requestId;
|
||||
}
|
||||
|
||||
private async Task WaitForDownloadReady(DownloadFileTransfer downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
|
||||
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
|
||||
{
|
||||
bool alreadyCancelled = false;
|
||||
try
|
||||
@@ -309,7 +356,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
{
|
||||
if (downloadCt.IsCancellationRequested) throw;
|
||||
|
||||
var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer.DownloadUri, requestId, downloadFileTransfer.Hash), downloadCt).ConfigureAwait(false);
|
||||
var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
|
||||
downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false);
|
||||
req.EnsureSuccessStatusCode();
|
||||
localTimeoutCts.Dispose();
|
||||
composite.Dispose();
|
||||
@@ -328,7 +376,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
{
|
||||
try
|
||||
{
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer.DownloadUri, requestId)).ConfigureAwait(false);
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
|
||||
alreadyCancelled = true;
|
||||
}
|
||||
catch
|
||||
@@ -344,7 +392,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
{
|
||||
try
|
||||
{
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer.DownloadUri, requestId)).ConfigureAwait(false);
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user