Download rework (#36)

* handle download ready from signalr

* add cancellation to the server

* adjust queue for removal and cancellation

* adjust api to main

Co-authored-by: rootdarkarchon <root.darkarchon@outlook.com>
This commit is contained in:
rootdarkarchon
2023-01-15 16:00:10 +01:00
committed by GitHub
parent f5f9b9c6fc
commit 5a16a15e8b
6 changed files with 74 additions and 72 deletions

Submodule MareAPI updated: f1c0fc76a9...5ac8a753dd

View File

@@ -168,8 +168,10 @@ public class CachedPlayer
} }
_downloadCancellationTokenSource?.Cancel(); _downloadCancellationTokenSource?.Cancel();
_downloadCancellationTokenSource?.Dispose();
_downloadCancellationTokenSource = new CancellationTokenSource(); _downloadCancellationTokenSource = new CancellationTokenSource();
var downloadToken = _downloadCancellationTokenSource.Token; var downloadToken = _downloadCancellationTokenSource.Token;
var downloadId = _apiController.GetDownloadId(); var downloadId = _apiController.GetDownloadId();
Task.Run(async () => Task.Run(async () =>
{ {
@@ -218,8 +220,6 @@ public class CachedPlayer
Logger.Debug("Download Task was cancelled"); Logger.Debug("Download Task was cancelled");
_apiController.CancelDownload(downloadId); _apiController.CancelDownload(downloadId);
}); });
_downloadCancellationTokenSource = null;
} }
private List<FileReplacementDto> TryCalculateModdedDictionary(out Dictionary<string, string> moddedDictionary) private List<FileReplacementDto> TryCalculateModdedDictionary(out Dictionary<string, string> moddedDictionary)

View File

@@ -63,7 +63,8 @@ public class CompactUi : Window, IDisposable
this.WindowName = "Mare Synchronos " + dateTime + "###MareSynchronosMainUI"; this.WindowName = "Mare Synchronos " + dateTime + "###MareSynchronosMainUI";
Toggle(); Toggle();
#else #else
this.WindowName = "Mare Synchronos " + Assembly.GetExecutingAssembly().GetName().Version + "###MareSynchronosMainUI"; var ver = Assembly.GetExecutingAssembly().GetName().Version;
this.WindowName = "Mare Synchronos " + ver.Major + "." + ver.Minor + "." + ver.Revision + "###MareSynchronosMainUI";
#endif #endif
Logger.Verbose("Creating " + nameof(CompactUi)); Logger.Verbose("Creating " + nameof(CompactUi));

View File

@@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
@@ -15,13 +16,13 @@ using MareSynchronos.API;
using MareSynchronos.Utils; using MareSynchronos.Utils;
using MareSynchronos.WebAPI.Utils; using MareSynchronos.WebAPI.Utils;
using Microsoft.AspNetCore.SignalR.Client; using Microsoft.AspNetCore.SignalR.Client;
using Newtonsoft.Json;
namespace MareSynchronos.WebAPI; namespace MareSynchronos.WebAPI;
public partial class ApiController public partial class ApiController
{ {
private readonly Dictionary<string, DateTime> _verifiedUploadedHashes; private readonly Dictionary<string, DateTime> _verifiedUploadedHashes;
private readonly ConcurrentDictionary<Guid, bool> _downloadReady = new();
private int _downloadId = 0; private int _downloadId = 0;
public async void CancelUpload() public async void CancelUpload()
@@ -46,94 +47,80 @@ public partial class ApiController
await _mareHub!.SendAsync(nameof(FilesDeleteAll)).ConfigureAwait(false); await _mareHub!.SendAsync(nameof(FilesDeleteAll)).ConfigureAwait(false);
} }
private async Task<QueueRequestDto> GetQueueRequestDto(DownloadFileTransfer downloadFileTransfer) private async Task<Guid> GetQueueRequest(DownloadFileTransfer downloadFileTransfer, CancellationToken ct)
{ {
var response = await SendRequestAsync<object>(HttpMethod.Get, MareFiles.RequestRequestFileFullPath(downloadFileTransfer.DownloadUri, downloadFileTransfer.Hash)).ConfigureAwait(false); var response = await SendRequestAsync<object>(HttpMethod.Get, MareFiles.RequestRequestFileFullPath(downloadFileTransfer.DownloadUri, downloadFileTransfer.Hash), ct: ct).ConfigureAwait(false);
return JsonConvert.DeserializeObject<QueueRequestDto>(await response.Content.ReadAsStringAsync().ConfigureAwait(false))!; var responseString = await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false);
var requestId = Guid.Parse(responseString.Trim('"'));
if (!_downloadReady.ContainsKey(requestId))
{
_downloadReady[requestId] = false;
}
return requestId;
} }
private async Task<Guid> WaitForQueue(DownloadFileTransfer fileTransfer, Guid requestId, CancellationToken ct) private async Task WaitForDownloadReady(DownloadFileTransfer downloadFileTransfer, Guid requestId, CancellationToken ct)
{ {
while (!ct.IsCancellationRequested) bool alreadyCancelled = false;
try
{
while (!ct.IsCancellationRequested && _downloadReady.TryGetValue(requestId, out bool isReady) && !isReady)
{
Logger.Verbose($"Waiting for {requestId} to become ready for download");
await Task.Delay(250, ct).ConfigureAwait(false);
}
Logger.Debug($"Download {requestId} ready");
}
catch (TaskCanceledException)
{ {
await Task.Delay(500, ct).ConfigureAwait(false);
var queueResponse = await SendRequestAsync<object>(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(fileTransfer.DownloadUri, requestId)).ConfigureAwait(false);
try try
{ {
queueResponse.EnsureSuccessStatusCode(); await SendRequestAsync<object>(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer.DownloadUri, requestId), ct).ConfigureAwait(false);
Logger.Debug($"Starting download for file {fileTransfer.Hash} ({requestId})"); alreadyCancelled = true;
break;
} }
catch (HttpRequestException ex) catch { }
{
switch (ex.StatusCode)
{
case HttpStatusCode.Conflict:
Logger.Debug($"In queue for file {fileTransfer.Hash} ({requestId})");
// still in queue
break;
case HttpStatusCode.BadRequest:
// rerequest queue
Logger.Debug($"Rerequesting {fileTransfer.Hash}");
var dto = await GetQueueRequestDto(fileTransfer).ConfigureAwait(false);
requestId = dto.RequestId;
break;
default:
Logger.Warn($"Unclear response from server: {fileTransfer.Hash} ({requestId}): {ex.StatusCode}");
break;
}
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false); throw;
} }
finally
{
if (ct.IsCancellationRequested && !alreadyCancelled)
{
try
{
await SendRequestAsync<object>(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer.DownloadUri, requestId), ct).ConfigureAwait(false);
}
catch { }
}
_downloadReady.Remove(requestId, out _);
} }
return requestId;
} }
private async Task<string> DownloadFileHttpClient(DownloadFileTransfer fileTransfer, IProgress<long> progress, CancellationToken ct) private async Task<string> DownloadFileHttpClient(DownloadFileTransfer fileTransfer, IProgress<long> progress, CancellationToken ct)
{ {
var queueRequest = await GetQueueRequestDto(fileTransfer).ConfigureAwait(false); var requestId = await GetQueueRequest(fileTransfer, ct).ConfigureAwait(false);
Logger.Debug($"GUID {queueRequest.RequestId} for file {fileTransfer.Hash}, queue status {queueRequest.QueueStatus}"); Logger.Debug($"GUID {requestId} for file {fileTransfer.Hash} on server {fileTransfer.DownloadUri}");
var requestId = queueRequest.QueueStatus == QueueStatus.Ready await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false);
? queueRequest.RequestId
: await WaitForQueue(fileTransfer, queueRequest.RequestId, ct).ConfigureAwait(false);
int attempts = 1;
bool failed = true;
const int maxAttempts = 16;
HttpResponseMessage response = null!; HttpResponseMessage response = null!;
HttpStatusCode? lastError = HttpStatusCode.OK;
var requestUrl = MareFiles.CacheGetFullPath(fileTransfer.DownloadUri, requestId); var requestUrl = MareFiles.CacheGetFullPath(fileTransfer.DownloadUri, requestId);
Logger.Debug($"Downloading {requestUrl} for file {fileTransfer.Hash}"); Logger.Debug($"Downloading {requestUrl} for file {fileTransfer.Hash}");
while (failed && attempts < maxAttempts && !ct.IsCancellationRequested) try
{ {
try response = await SendRequestAsync<object>(HttpMethod.Get, requestUrl, ct: ct).ConfigureAwait(false);
{ response.EnsureSuccessStatusCode();
response = await SendRequestAsync<object>(HttpMethod.Get, requestUrl, ct: ct).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
failed = false;
}
catch (HttpRequestException ex)
{
Logger.Warn($"Attempt {attempts}: Error during download of {requestUrl}, HttpStatusCode: {ex.StatusCode}");
lastError = ex.StatusCode;
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
{
break;
}
attempts++;
await Task.Delay(TimeSpan.FromSeconds(new Random().NextDouble() * 2), ct).ConfigureAwait(false);
}
} }
catch (HttpRequestException ex)
if (failed)
{ {
throw new Exception($"Http error {lastError} after {maxAttempts} attempts (cancelled: {ct.IsCancellationRequested}): {requestUrl}"); Logger.Warn($"Error during download of {requestUrl}, HttpStatusCode: {ex.StatusCode}");
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
{
throw new Exception($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
}
} }
var fileName = Path.GetTempFileName(); var fileName = Path.GetTempFileName();
@@ -259,16 +246,16 @@ public partial class ApiController
if (token.IsCancellationRequested) if (token.IsCancellationRequested)
{ {
File.Delete(tempFile); File.Delete(tempFile);
Logger.Debug("Detected cancellation, removing " + currentDownloadId); Logger.Debug("Detetokened cancellation, removing " + currentDownloadId);
CancelDownload(currentDownloadId); CancelDownload(currentDownloadId);
return; return;
} }
var tempFileData = await File.ReadAllBytesAsync(tempFile, token).ConfigureAwait(false); var tempFileData = await File.ReadAllBytesAsync(tempFile, token).ConfigureAwait(false);
var extractedFile = LZ4Codec.Unwrap(tempFileData); var extratokenedFile = LZ4Codec.Unwrap(tempFileData);
File.Delete(tempFile); File.Delete(tempFile);
var filePath = Path.Combine(_pluginConfiguration.CacheFolder, file.Hash); var filePath = Path.Combine(_pluginConfiguration.CacheFolder, file.Hash);
await File.WriteAllBytesAsync(filePath, extractedFile, token).ConfigureAwait(false); await File.WriteAllBytesAsync(filePath, extratokenedFile, token).ConfigureAwait(false);
var fi = new FileInfo(filePath); var fi = new FileInfo(filePath);
Func<DateTime> RandomDayInThePast() Func<DateTime> RandomDayInThePast()
{ {

View File

@@ -84,6 +84,12 @@ public partial class ApiController
_mareHub!.On(nameof(Client_ReceiveServerMessage), act); _mareHub!.On(nameof(Client_ReceiveServerMessage), act);
} }
public void OnDownloadReady(Action<Guid> act)
{
if (_initialized) return;
_mareHub!.On(nameof(Client_DownloadReady), act);
}
public Task Client_UserUpdateClientPairs(ClientPairDto dto) public Task Client_UserUpdateClientPairs(ClientPairDto dto)
{ {
var entry = PairedClients.SingleOrDefault(e => string.Equals(e.OtherUID, dto.OtherUID, System.StringComparison.Ordinal)); var entry = PairedClients.SingleOrDefault(e => string.Equals(e.OtherUID, dto.OtherUID, System.StringComparison.Ordinal));
@@ -243,4 +249,11 @@ public partial class ApiController
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task Client_DownloadReady(Guid requestId)
{
Logger.Debug($"Server sent {requestId} ready");
_downloadReady[requestId] = true;
return Task.CompletedTask;
}
} }

View File

@@ -299,6 +299,7 @@ public partial class ApiController : IDisposable, IMareHubClient
OnUserReceiveCharacterData((dto, ident) => Client_UserReceiveCharacterData(dto, ident)); OnUserReceiveCharacterData((dto, ident) => Client_UserReceiveCharacterData(dto, ident));
OnGroupChange(async (dto) => await Client_GroupChange(dto).ConfigureAwait(false)); OnGroupChange(async (dto) => await Client_GroupChange(dto).ConfigureAwait(false));
OnGroupUserChange((dto) => Client_GroupUserChange(dto)); OnGroupUserChange((dto) => Client_GroupUserChange(dto));
OnDownloadReady((guid) => Client_DownloadReady(guid));
OnAdminForcedReconnect(() => Client_AdminForcedReconnect()); OnAdminForcedReconnect(() => Client_AdminForcedReconnect());