add download throttling, change header of mare, fix reverting players when going offline/paused when not visible
This commit is contained in:
@@ -19,6 +19,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
private readonly FileCompactor _fileCompactor;
|
||||
private readonly FileCacheManager _fileDbManager;
|
||||
private readonly FileTransferOrchestrator _orchestrator;
|
||||
private readonly List<ThrottledStream> _activeDownloadStreams;
|
||||
|
||||
public FileDownloadManager(ILogger<FileDownloadManager> logger, MareMediator mediator,
|
||||
FileTransferOrchestrator orchestrator,
|
||||
@@ -28,6 +29,18 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
_orchestrator = orchestrator;
|
||||
_fileDbManager = fileCacheManager;
|
||||
_fileCompactor = fileCompactor;
|
||||
_activeDownloadStreams = [];
|
||||
|
||||
Mediator.Subscribe<DownloadLimitChangedMessage>(this, (msg) =>
|
||||
{
|
||||
if (!_activeDownloadStreams.Any()) return;
|
||||
var newLimit = _orchestrator.DownloadLimitPerSlot();
|
||||
Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
|
||||
foreach (var stream in _activeDownloadStreams)
|
||||
{
|
||||
stream.BandwidthLimit = newLimit;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public List<DownloadFileTransfer> CurrentDownloads { get; private set; } = [];
|
||||
@@ -71,6 +84,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
CancelDownload();
|
||||
foreach (var stream in _activeDownloadStreams)
|
||||
{
|
||||
try
|
||||
{
|
||||
stream.Dispose();
|
||||
}
|
||||
catch { }
|
||||
}
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
@@ -133,6 +154,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
}
|
||||
}
|
||||
|
||||
ThrottledStream? stream = null;
|
||||
try
|
||||
{
|
||||
var fileStream = File.Create(tempPath);
|
||||
@@ -142,7 +164,10 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
var buffer = new byte[bufferSize];
|
||||
|
||||
var bytesRead = 0;
|
||||
var stream = await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false);
|
||||
var limit = _orchestrator.DownloadLimitPerSlot();
|
||||
Logger.LogTrace("Starting Download of {id} with a speed limit of {limit}", requestId, limit);
|
||||
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
|
||||
_activeDownloadStreams.Add(stream);
|
||||
while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
@@ -171,6 +196,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
}
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (stream != null)
|
||||
{
|
||||
_activeDownloadStreams.Remove(stream);
|
||||
await stream.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
|
||||
|
||||
@@ -19,6 +19,7 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
|
||||
private readonly TokenProvider _tokenProvider;
|
||||
private int _availableDownloadSlots;
|
||||
private SemaphoreSlim _downloadSemaphore;
|
||||
private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount;
|
||||
|
||||
public FileTransferOrchestrator(ILogger<FileTransferOrchestrator> logger, MareConfigService mareConfig,
|
||||
MareMediator mediator, TokenProvider tokenProvider) : base(logger, mediator)
|
||||
@@ -72,6 +73,7 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
|
||||
public void ReleaseDownloadSlot()
|
||||
{
|
||||
_downloadSemaphore.Release();
|
||||
Mediator.Publish(new DownloadLimitChangedMessage());
|
||||
}
|
||||
|
||||
public async Task<HttpResponseMessage> SendRequestAsync(HttpMethod method, Uri uri,
|
||||
@@ -110,6 +112,22 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
|
||||
}
|
||||
|
||||
await _downloadSemaphore.WaitAsync(token).ConfigureAwait(false);
|
||||
Mediator.Publish(new DownloadLimitChangedMessage());
|
||||
}
|
||||
|
||||
public long DownloadLimitPerSlot()
|
||||
{
|
||||
var limit = _mareConfig.Current.DownloadSpeedLimitInBytes;
|
||||
if (limit <= 0) return 0;
|
||||
limit = _mareConfig.Current.DownloadSpeedType switch
|
||||
{
|
||||
MareConfiguration.Models.DownloadSpeeds.Bps => limit,
|
||||
MareConfiguration.Models.DownloadSpeeds.KBps => limit * 1024,
|
||||
MareConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024,
|
||||
_ => limit,
|
||||
};
|
||||
var dividedLimit = limit / (CurrentlyUsedDownloadSlots == 0 ? 1 : CurrentlyUsedDownloadSlots);
|
||||
return dividedLimit == 0 ? 1 : dividedLimit;
|
||||
}
|
||||
|
||||
private async Task<HttpResponseMessage> SendRequestInternalAsync(HttpRequestMessage requestMessage,
|
||||
|
||||
217
MareSynchronos/WebAPI/Files/ThrottledStream.cs
Normal file
217
MareSynchronos/WebAPI/Files/ThrottledStream.cs
Normal file
@@ -0,0 +1,217 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace MareSynchronos.WebAPI.Files
|
||||
{
|
||||
/// <summary>
|
||||
/// Class for streaming data with throttling support.
|
||||
/// Borrowed from https://github.com/bezzad/Downloader
|
||||
/// </summary>
|
||||
internal class ThrottledStream : Stream
|
||||
{
|
||||
public static long Infinite => long.MaxValue;
|
||||
private readonly Stream _baseStream;
|
||||
private long _bandwidthLimit;
|
||||
private Bandwidth _bandwidth;
|
||||
private CancellationTokenSource _bandwidthChangeTokenSource = new CancellationTokenSource();
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="T:ThrottledStream" /> class.
|
||||
/// </summary>
|
||||
/// <param name="baseStream">The base stream.</param>
|
||||
/// <param name="bandwidthLimit">The maximum bytes per second that can be transferred through the base stream.</param>
|
||||
/// <exception cref="ArgumentNullException">Thrown when <see cref="baseStream" /> is a null reference.</exception>
|
||||
/// <exception cref="ArgumentOutOfRangeException">Thrown when <see cref="BandwidthLimit" /> is a negative value.</exception>
|
||||
public ThrottledStream(Stream baseStream, long bandwidthLimit)
|
||||
{
|
||||
if (bandwidthLimit < 0)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(bandwidthLimit),
|
||||
bandwidthLimit, "The maximum number of bytes per second can't be negative.");
|
||||
}
|
||||
|
||||
_baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
|
||||
BandwidthLimit = bandwidthLimit;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bandwidth Limit (in B/s)
|
||||
/// </summary>
|
||||
/// <value>The maximum bytes per second.</value>
|
||||
public long BandwidthLimit
|
||||
{
|
||||
get => _bandwidthLimit;
|
||||
set
|
||||
{
|
||||
if (_bandwidthLimit == value) return;
|
||||
_bandwidthLimit = value <= 0 ? Infinite : value;
|
||||
_bandwidth ??= new Bandwidth();
|
||||
_bandwidth.BandwidthLimit = _bandwidthLimit;
|
||||
_bandwidthChangeTokenSource.Cancel();
|
||||
_bandwidthChangeTokenSource.Dispose();
|
||||
_bandwidthChangeTokenSource = new();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool CanRead => _baseStream.CanRead;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool CanSeek => _baseStream.CanSeek;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool CanWrite => _baseStream.CanWrite;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override long Length => _baseStream.Length;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override long Position
|
||||
{
|
||||
get => _baseStream.Position;
|
||||
set => _baseStream.Position = value;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Flush()
|
||||
{
|
||||
_baseStream.Flush();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
{
|
||||
return _baseStream.Seek(offset, origin);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void SetLength(long value)
|
||||
{
|
||||
_baseStream.SetLength(value);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
Throttle(count).Wait();
|
||||
return _baseStream.Read(buffer, offset, count);
|
||||
}
|
||||
|
||||
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await Throttle(count, cancellationToken).ConfigureAwait(false);
|
||||
return await _baseStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
Throttle(count).Wait();
|
||||
_baseStream.Write(buffer, offset, count);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
await Throttle(count, cancellationToken).ConfigureAwait(false);
|
||||
await _baseStream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public override void Close()
|
||||
{
|
||||
_baseStream.Close();
|
||||
base.Close();
|
||||
}
|
||||
|
||||
private async Task Throttle(int transmissionVolume, CancellationToken token = default)
|
||||
{
|
||||
// Make sure the buffer isn't empty.
|
||||
if (BandwidthLimit > 0 && transmissionVolume > 0)
|
||||
{
|
||||
// Calculate the time to sleep.
|
||||
_bandwidth.CalculateSpeed(transmissionVolume);
|
||||
await Sleep(_bandwidth.PopSpeedRetrieveTime(), token).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task Sleep(int time, CancellationToken token = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (time > 0)
|
||||
{
|
||||
var bandWidthtoken = _bandwidthChangeTokenSource.Token;
|
||||
var linked = CancellationTokenSource.CreateLinkedTokenSource(token, bandWidthtoken).Token;
|
||||
await Task.Delay(time, linked).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return _baseStream.ToString();
|
||||
}
|
||||
|
||||
private class Bandwidth
|
||||
{
|
||||
private long _count;
|
||||
private int _lastSecondCheckpoint;
|
||||
private long _lastTransferredBytesCount;
|
||||
private int _speedRetrieveTime;
|
||||
public double Speed { get; private set; }
|
||||
public double AverageSpeed { get; private set; }
|
||||
public long BandwidthLimit { get; set; }
|
||||
|
||||
public Bandwidth()
|
||||
{
|
||||
BandwidthLimit = long.MaxValue;
|
||||
Reset();
|
||||
}
|
||||
|
||||
public void CalculateSpeed(long receivedBytesCount)
|
||||
{
|
||||
int elapsedTime = Environment.TickCount - _lastSecondCheckpoint + 1;
|
||||
receivedBytesCount = Interlocked.Add(ref _lastTransferredBytesCount, receivedBytesCount);
|
||||
double momentSpeed = receivedBytesCount * 1000 / elapsedTime; // B/s
|
||||
|
||||
if (1000 < elapsedTime)
|
||||
{
|
||||
Speed = momentSpeed;
|
||||
AverageSpeed = ((AverageSpeed * _count) + Speed) / (_count + 1);
|
||||
_count++;
|
||||
SecondCheckpoint();
|
||||
}
|
||||
|
||||
if (momentSpeed >= BandwidthLimit)
|
||||
{
|
||||
var expectedTime = receivedBytesCount * 1000 / BandwidthLimit;
|
||||
Interlocked.Add(ref _speedRetrieveTime, (int)expectedTime - elapsedTime);
|
||||
}
|
||||
}
|
||||
|
||||
public int PopSpeedRetrieveTime()
|
||||
{
|
||||
return Interlocked.Exchange(ref _speedRetrieveTime, 0);
|
||||
}
|
||||
|
||||
public void Reset()
|
||||
{
|
||||
SecondCheckpoint();
|
||||
_count = 0;
|
||||
Speed = 0;
|
||||
AverageSpeed = 0;
|
||||
}
|
||||
|
||||
private void SecondCheckpoint()
|
||||
{
|
||||
Interlocked.Exchange(ref _lastSecondCheckpoint, Environment.TickCount);
|
||||
Interlocked.Exchange(ref _lastTransferredBytesCount, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user