diff --git a/MareSynchronos/Interop/IpcManager.cs b/MareSynchronos/Interop/IpcManager.cs index 6fe2327..9dbad06 100644 --- a/MareSynchronos/Interop/IpcManager.cs +++ b/MareSynchronos/Interop/IpcManager.cs @@ -322,6 +322,26 @@ public sealed class IpcManager : DisposableMediatorSubscriberBase } } + public async Task GlamourerRevertByNameAsync(ILogger logger, string name, Guid applicationId) + { + if ((!CheckGlamourerApi()) || _dalamudUtil.IsZoning) return; + + await _dalamudUtil.RunOnFrameworkThread(() => + { + try + { + logger.LogDebug("[{appid}] Calling On IPC: GlamourerRevertByName", applicationId); + _glamourerRevertByName.InvokeAction(name, LockCode); + logger.LogDebug("[{appid}] Calling On IPC: GlamourerUnlockName", applicationId); + _glamourerUnlock.InvokeFunc(name, LockCode); + } + catch (Exception ex) + { + Logger.LogWarning(ex, "Error during Glamourer RevertByName"); + } + }).ConfigureAwait(false); + } + public void GlamourerRevertByName(ILogger logger, string name, Guid applicationId) { if ((!CheckGlamourerApi()) || _dalamudUtil.IsZoning) return; diff --git a/MareSynchronos/MareConfiguration/Configurations/MareConfig.cs b/MareSynchronos/MareConfiguration/Configurations/MareConfig.cs index 013c0eb..1099395 100644 --- a/MareSynchronos/MareConfiguration/Configurations/MareConfig.cs +++ b/MareSynchronos/MareConfiguration/Configurations/MareConfig.cs @@ -24,6 +24,8 @@ public class MareConfig : IMareConfiguration public bool OpenGposeImportOnGposeStart { get; set; } = false; public bool OpenPopupOnAdd { get; set; } = true; public int ParallelDownloads { get; set; } = 10; + public int DownloadSpeedLimitInBytes { get; set; } = 0; + public DownloadSpeeds DownloadSpeedType { get; set; } = DownloadSpeeds.MBps; public bool PreferNotesOverNamesForVisible { get; set; } = false; public float ProfileDelay { get; set; } = 1.5f; public bool ProfilePopoutRight { get; set; } = false; diff --git a/MareSynchronos/MareConfiguration/Models/DownloadSpeeds.cs b/MareSynchronos/MareConfiguration/Models/DownloadSpeeds.cs new file mode 100644 index 0000000..815da1f --- /dev/null +++ b/MareSynchronos/MareConfiguration/Models/DownloadSpeeds.cs @@ -0,0 +1,8 @@ +namespace MareSynchronos.MareConfiguration.Models; + +public enum DownloadSpeeds +{ + Bps, + KBps, + MBps +} \ No newline at end of file diff --git a/MareSynchronos/MareSynchronos.csproj b/MareSynchronos/MareSynchronos.csproj index 6d2c8e5..3a86763 100644 --- a/MareSynchronos/MareSynchronos.csproj +++ b/MareSynchronos/MareSynchronos.csproj @@ -31,6 +31,7 @@ + all diff --git a/MareSynchronos/PlayerData/Handlers/PairHandler.cs b/MareSynchronos/PlayerData/Handlers/PairHandler.cs index 2f3693e..6778809 100644 --- a/MareSynchronos/PlayerData/Handlers/PairHandler.cs +++ b/MareSynchronos/PlayerData/Handlers/PairHandler.cs @@ -191,22 +191,35 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase if (_lifetime.ApplicationStopping.IsCancellationRequested) return; - if (_dalamudUtil is { IsZoning: false, IsInCutscene: false }) + if (_dalamudUtil is { IsZoning: false, IsInCutscene: false } && !string.IsNullOrEmpty(name)) { Logger.LogTrace("[{applicationId}] Restoring state for {name} ({OnlineUser})", applicationId, name, OnlineUser); + Logger.LogDebug("[{applicationId}] Removing Temp Collection for {name} ({user})", applicationId, name, OnlineUser); _ipcManager.PenumbraRemoveTemporaryCollectionAsync(Logger, applicationId, _penumbraCollection).GetAwaiter().GetResult(); - - foreach (KeyValuePair> item in _cachedData?.FileReplacements ?? []) + if (!IsVisible) { - try + Logger.LogDebug("[{applicationId}] Restoring Glamourer for {name} ({user})", applicationId, name, OnlineUser); + _ipcManager.GlamourerRevertByNameAsync(Logger, name, applicationId).GetAwaiter().GetResult(); + } + else + { + var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(60)); + + foreach (KeyValuePair> item in _cachedData?.FileReplacements ?? []) { - RevertCustomizationDataAsync(item.Key, name, applicationId).GetAwaiter().GetResult(); - } - catch (InvalidOperationException ex) - { - Logger.LogWarning(ex, "Failed disposing player (not present anymore?)"); - break; + try + { + RevertCustomizationDataAsync(item.Key, name, applicationId, cts.Token).GetAwaiter().GetResult(); + } + catch (InvalidOperationException ex) + { + Logger.LogWarning(ex, "Failed disposing player (not present anymore?)"); + break; + } } + + cts.CancelDispose(); } } } @@ -477,14 +490,11 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase _ipcManager.PenumbraAssignTemporaryCollectionAsync(Logger, _penumbraCollection, _charaHandler.GetGameObject()!.ObjectIndex).GetAwaiter().GetResult(); } - private async Task RevertCustomizationDataAsync(ObjectKind objectKind, string name, Guid applicationId) + private async Task RevertCustomizationDataAsync(ObjectKind objectKind, string name, Guid applicationId, CancellationToken cancelToken) { nint address = _dalamudUtil.GetPlayerCharacterFromCachedTableByIdent(OnlineUser.Ident); if (address == nint.Zero) return; - var cancelToken = new CancellationTokenSource(); - cancelToken.CancelAfter(TimeSpan.FromSeconds(60)); - Logger.LogDebug("[{applicationId}] Reverting all Customization for {alias}/{name} {objectKind}", applicationId, OnlineUser.User.AliasOrUID, name, objectKind); if (objectKind == ObjectKind.Player) @@ -492,7 +502,7 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase using GameObjectHandler tempHandler = await _gameObjectHandlerFactory.Create(ObjectKind.Player, () => address, isWatched: false).ConfigureAwait(false); tempHandler.CompareNameAndThrow(name); Logger.LogDebug("[{applicationId}] Restoring Customization and Equipment for {alias}/{name}", applicationId, OnlineUser.User.AliasOrUID, name); - await _ipcManager.GlamourerRevert(Logger, name, tempHandler, applicationId, cancelToken.Token).ConfigureAwait(false); + await _ipcManager.GlamourerRevert(Logger, name, tempHandler, applicationId, cancelToken).ConfigureAwait(false); tempHandler.CompareNameAndThrow(name); Logger.LogDebug("[{applicationId}] Restoring Heels for {alias}/{name}", applicationId, OnlineUser.User.AliasOrUID, name); await _ipcManager.HeelsRestoreOffsetForPlayerAsync(address).ConfigureAwait(false); @@ -513,8 +523,8 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase { await _ipcManager.CustomizePlusRevertAsync(minionOrMount).ConfigureAwait(false); using GameObjectHandler tempHandler = await _gameObjectHandlerFactory.Create(ObjectKind.MinionOrMount, () => minionOrMount, isWatched: false).ConfigureAwait(false); - await _ipcManager.GlamourerRevert(Logger, tempHandler.Name, tempHandler, applicationId, cancelToken.Token).ConfigureAwait(false); - await _ipcManager.PenumbraRedrawAsync(Logger, tempHandler, applicationId, cancelToken.Token).ConfigureAwait(false); + await _ipcManager.GlamourerRevert(Logger, tempHandler.Name, tempHandler, applicationId, cancelToken).ConfigureAwait(false); + await _ipcManager.PenumbraRedrawAsync(Logger, tempHandler, applicationId, cancelToken).ConfigureAwait(false); } } else if (objectKind == ObjectKind.Pet) @@ -524,8 +534,8 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase { await _ipcManager.CustomizePlusRevertAsync(pet).ConfigureAwait(false); using GameObjectHandler tempHandler = await _gameObjectHandlerFactory.Create(ObjectKind.Pet, () => pet, isWatched: false).ConfigureAwait(false); - await _ipcManager.GlamourerRevert(Logger, tempHandler.Name, tempHandler, applicationId, cancelToken.Token).ConfigureAwait(false); - await _ipcManager.PenumbraRedrawAsync(Logger, tempHandler, applicationId, cancelToken.Token).ConfigureAwait(false); + await _ipcManager.GlamourerRevert(Logger, tempHandler.Name, tempHandler, applicationId, cancelToken).ConfigureAwait(false); + await _ipcManager.PenumbraRedrawAsync(Logger, tempHandler, applicationId, cancelToken).ConfigureAwait(false); } } else if (objectKind == ObjectKind.Companion) @@ -535,12 +545,10 @@ public sealed class PairHandler : DisposableMediatorSubscriberBase { await _ipcManager.CustomizePlusRevertAsync(companion).ConfigureAwait(false); using GameObjectHandler tempHandler = await _gameObjectHandlerFactory.Create(ObjectKind.Pet, () => companion, isWatched: false).ConfigureAwait(false); - await _ipcManager.GlamourerRevert(Logger, tempHandler.Name, tempHandler, applicationId, cancelToken.Token).ConfigureAwait(false); - await _ipcManager.PenumbraRedrawAsync(Logger, tempHandler, applicationId, cancelToken.Token).ConfigureAwait(false); + await _ipcManager.GlamourerRevert(Logger, tempHandler.Name, tempHandler, applicationId, cancelToken).ConfigureAwait(false); + await _ipcManager.PenumbraRedrawAsync(Logger, tempHandler, applicationId, cancelToken).ConfigureAwait(false); } } - - cancelToken.CancelDispose(); } private List TryCalculateModdedDictionary(Guid applicationBase, CharacterData charaData, out Dictionary moddedDictionary, CancellationToken token) diff --git a/MareSynchronos/PlayerData/Pairs/Pair.cs b/MareSynchronos/PlayerData/Pairs/Pair.cs index 764be38..83a9a5a 100644 --- a/MareSynchronos/PlayerData/Pairs/Pair.cs +++ b/MareSynchronos/PlayerData/Pairs/Pair.cs @@ -40,7 +40,7 @@ public class Pair public bool IsOnline => CachedPlayer != null; public bool IsPaired => IndividualPairStatus == IndividualPairStatus.Bidirectional || UserPair.Groups.Any(); - public bool IsPaused => UserPair.OtherPermissions.IsPaused() || UserPair.OwnPermissions.IsPaused(); + public bool IsPaused => UserPair.OwnPermissions.IsPaused(); public bool IsVisible => CachedPlayer?.IsVisible ?? false; public CharacterData? LastReceivedCharacterData { get; set; } public string? PlayerName => CachedPlayer?.PlayerName ?? string.Empty; diff --git a/MareSynchronos/Services/Mediator/Messages.cs b/MareSynchronos/Services/Mediator/Messages.cs index 85a69d4..c839d29 100644 --- a/MareSynchronos/Services/Mediator/Messages.cs +++ b/MareSynchronos/Services/Mediator/Messages.cs @@ -74,6 +74,7 @@ public record OpenReportPopupMessage(Pair PairToReport) : MessageBase; public record OpenBanUserPopupMessage(Pair PairToBan, GroupFullInfoDto GroupFullInfoDto) : MessageBase; public record OpenSyncshellAdminPanel(GroupFullInfoDto GroupInfo) : MessageBase; public record OpenPermissionWindow(Pair Pair) : MessageBase; +public record DownloadLimitChangedMessage() : SameThreadMessage; #pragma warning restore S2094 #pragma warning restore MA0048 // File name must match type name \ No newline at end of file diff --git a/MareSynchronos/UI/CompactUI.cs b/MareSynchronos/UI/CompactUI.cs index 74f8c59..15b5ea9 100644 --- a/MareSynchronos/UI/CompactUI.cs +++ b/MareSynchronos/UI/CompactUI.cs @@ -69,6 +69,21 @@ public class CompactUi : WindowMediatorSubscriberBase _selectPairsForGroupUi = selectPairForTagUi; _tabMenu = new TopTabMenu(Mediator, _apiController, _pairManager); + AllowPinning = false; + AllowClickthrough = false; + TitleBarButtons = new() + { + new TitleBarButton() + { + Icon = FontAwesomeIcon.Cog, + Click = (msg) => + { + Mediator.Publish(new UiToggleMessage(typeof(SettingsUi))); + }, + IconOffset = new(2,1) + } + }; + _drawFolders = GetDrawFolders().ToList(); #if DEBUG @@ -340,47 +355,35 @@ public class CompactUi : WindowMediatorSubscriberBase private void DrawUIDHeader() { var uidText = GetUidText(); - var buttonSizeX = 0f; - if (_uiShared.UidFontBuilt) ImGui.PushFont(_uiShared.UidFont); - var uidTextSize = ImGui.CalcTextSize(uidText); - if (_uiShared.UidFontBuilt) ImGui.PopFont(); - - var originalPos = ImGui.GetCursorPos(); - ImGui.SetWindowFontScale(1.5f); - var buttonSize = UiSharedService.GetIconButtonSize(FontAwesomeIcon.Cog); - buttonSizeX -= buttonSize.X - ImGui.GetStyle().ItemSpacing.X * 2; - ImGui.SameLine(ImGui.GetWindowContentRegionMin().X + UiSharedService.GetWindowContentRegionWidth() - buttonSize.X); - ImGui.SetCursorPosY(originalPos.Y + uidTextSize.Y / 2 - buttonSize.Y / 2); - if (ImGuiComponents.IconButton(FontAwesomeIcon.Cog)) + using (ImRaii.PushFont(_uiShared.UidFont, _uiShared.UidFontBuilt)) { - Mediator.Publish(new OpenSettingsUiMessage()); + var uidTextSize = ImGui.CalcTextSize(uidText); + ImGui.SetCursorPosX((ImGui.GetWindowContentRegionMax().X - ImGui.GetWindowContentRegionMin().X) / 2 - (uidTextSize.X / 2)); + ImGui.TextColored(GetUidColor(), uidText); } - UiSharedService.AttachToolTip("Open the Mare Synchronos Settings"); - - ImGui.SameLine(); //Important to draw the uidText consistently - ImGui.SetCursorPos(originalPos); if (_apiController.ServerState is ServerState.Connected) { - buttonSizeX += UiSharedService.GetIconButtonSize(FontAwesomeIcon.Copy).X - ImGui.GetStyle().ItemSpacing.X * 2; - ImGui.SetCursorPosY(originalPos.Y + uidTextSize.Y / 2 - buttonSize.Y / 2); - if (ImGuiComponents.IconButton(FontAwesomeIcon.Copy)) + if (ImGui.IsItemClicked()) { ImGui.SetClipboardText(_apiController.DisplayName); } - UiSharedService.AttachToolTip("Copy your UID to clipboard"); - ImGui.SameLine(); + UiSharedService.AttachToolTip("Click to copy"); + + if (!string.Equals(_apiController.DisplayName, _apiController.UID, StringComparison.Ordinal)) + { + var origTextSize = ImGui.CalcTextSize(_apiController.UID); + ImGui.SetCursorPosX((ImGui.GetWindowContentRegionMax().X - ImGui.GetWindowContentRegionMin().X) / 2 - (origTextSize.X / 2)); + ImGui.TextColored(GetUidColor(), _apiController.UID); + if (ImGui.IsItemClicked()) + { + ImGui.SetClipboardText(_apiController.UID); + } + UiSharedService.AttachToolTip("Click to copy"); + } } - ImGui.SetWindowFontScale(1f); - - ImGui.SetCursorPosY(originalPos.Y + buttonSize.Y / 2 - uidTextSize.Y / 2 - ImGui.GetStyle().ItemSpacing.Y / 2); - ImGui.SetCursorPosX((ImGui.GetWindowContentRegionMax().X + ImGui.GetWindowContentRegionMin().X) / 2 + buttonSizeX - uidTextSize.X / 2); - if (_uiShared.UidFontBuilt) ImGui.PushFont(_uiShared.UidFont); - ImGui.TextColored(GetUidColor(), uidText); - if (_uiShared.UidFontBuilt) ImGui.PopFont(); - - if (_apiController.ServerState is not ServerState.Connected) + else { UiSharedService.ColorTextWrapped(GetServerError(), GetUidColor()); if (_apiController.ServerState is ServerState.NoSecretKey) diff --git a/MareSynchronos/UI/SettingsUi.cs b/MareSynchronos/UI/SettingsUi.cs index a5bc263..1d7cc0b 100644 --- a/MareSynchronos/UI/SettingsUi.cs +++ b/MareSynchronos/UI/SettingsUi.cs @@ -69,6 +69,8 @@ public class SettingsUi : WindowMediatorSubscriberBase _apiController = apiController; _fileCompactor = fileCompactor; _uiShared = uiShared; + AllowClickthrough = false; + AllowPinning = false; SizeConstraints = new WindowSizeConstraints() { @@ -142,6 +144,37 @@ public class SettingsUi : WindowMediatorSubscriberBase int maxParallelDownloads = _configService.Current.ParallelDownloads; bool useAlternativeUpload = _configService.Current.UseAlternativeFileUpload; + int downloadSpeedLimit = _configService.Current.DownloadSpeedLimitInBytes; + + ImGui.AlignTextToFramePadding(); + ImGui.TextUnformatted("Global Download Speed Limit"); + ImGui.SameLine(); + ImGui.SetNextItemWidth(100); + if (ImGui.InputInt("###speedlimit", ref downloadSpeedLimit)) + { + _configService.Current.DownloadSpeedLimitInBytes = downloadSpeedLimit; + _configService.Save(); + Mediator.Publish(new DownloadLimitChangedMessage()); + } + ImGui.SameLine(); + ImGui.SetNextItemWidth(100); + _uiShared.DrawCombo("###speed", new[] { DownloadSpeeds.Bps, DownloadSpeeds.KBps, DownloadSpeeds.MBps }, + (s) => s switch + { + DownloadSpeeds.Bps => "Byte/s", + DownloadSpeeds.KBps => "KB/s", + DownloadSpeeds.MBps => "MB/s", + _ => throw new NotSupportedException() + }, (s) => + { + _configService.Current.DownloadSpeedType = s; + _configService.Save(); + Mediator.Publish(new DownloadLimitChangedMessage()); + }, _configService.Current.DownloadSpeedType); + ImGui.SameLine(); + ImGui.AlignTextToFramePadding(); + ImGui.TextUnformatted("0 = No limit/infinite"); + if (ImGui.SliderInt("Maximum Parallel Downloads", ref maxParallelDownloads, 1, 10)) { _configService.Current.ParallelDownloads = maxParallelDownloads; diff --git a/MareSynchronos/WebAPI/Files/FileDownloadManager.cs b/MareSynchronos/WebAPI/Files/FileDownloadManager.cs index 87290be..936a190 100644 --- a/MareSynchronos/WebAPI/Files/FileDownloadManager.cs +++ b/MareSynchronos/WebAPI/Files/FileDownloadManager.cs @@ -19,6 +19,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase private readonly FileCompactor _fileCompactor; private readonly FileCacheManager _fileDbManager; private readonly FileTransferOrchestrator _orchestrator; + private readonly List _activeDownloadStreams; public FileDownloadManager(ILogger logger, MareMediator mediator, FileTransferOrchestrator orchestrator, @@ -28,6 +29,18 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase _orchestrator = orchestrator; _fileDbManager = fileCacheManager; _fileCompactor = fileCompactor; + _activeDownloadStreams = []; + + Mediator.Subscribe(this, (msg) => + { + if (!_activeDownloadStreams.Any()) return; + var newLimit = _orchestrator.DownloadLimitPerSlot(); + Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit); + foreach (var stream in _activeDownloadStreams) + { + stream.BandwidthLimit = newLimit; + } + }); } public List CurrentDownloads { get; private set; } = []; @@ -71,6 +84,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase protected override void Dispose(bool disposing) { CancelDownload(); + foreach (var stream in _activeDownloadStreams) + { + try + { + stream.Dispose(); + } + catch { } + } base.Dispose(disposing); } @@ -133,6 +154,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase } } + ThrottledStream? stream = null; try { var fileStream = File.Create(tempPath); @@ -142,7 +164,10 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase var buffer = new byte[bufferSize]; var bytesRead = 0; - var stream = await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false); + var limit = _orchestrator.DownloadLimitPerSlot(); + Logger.LogTrace("Starting Download of {id} with a speed limit of {limit}", requestId, limit); + stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit); + _activeDownloadStreams.Add(stream); while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0) { ct.ThrowIfCancellationRequested(); @@ -171,6 +196,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase } throw; } + finally + { + if (stream != null) + { + _activeDownloadStreams.Remove(stream); + await stream.DisposeAsync().ConfigureAwait(false); + } + } } private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List fileReplacement, CancellationToken ct) diff --git a/MareSynchronos/WebAPI/Files/FileTransferOrchestrator.cs b/MareSynchronos/WebAPI/Files/FileTransferOrchestrator.cs index ab8c991..f1764c6 100644 --- a/MareSynchronos/WebAPI/Files/FileTransferOrchestrator.cs +++ b/MareSynchronos/WebAPI/Files/FileTransferOrchestrator.cs @@ -19,6 +19,7 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase private readonly TokenProvider _tokenProvider; private int _availableDownloadSlots; private SemaphoreSlim _downloadSemaphore; + private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount; public FileTransferOrchestrator(ILogger logger, MareConfigService mareConfig, MareMediator mediator, TokenProvider tokenProvider) : base(logger, mediator) @@ -72,6 +73,7 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase public void ReleaseDownloadSlot() { _downloadSemaphore.Release(); + Mediator.Publish(new DownloadLimitChangedMessage()); } public async Task SendRequestAsync(HttpMethod method, Uri uri, @@ -110,6 +112,22 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase } await _downloadSemaphore.WaitAsync(token).ConfigureAwait(false); + Mediator.Publish(new DownloadLimitChangedMessage()); + } + + public long DownloadLimitPerSlot() + { + var limit = _mareConfig.Current.DownloadSpeedLimitInBytes; + if (limit <= 0) return 0; + limit = _mareConfig.Current.DownloadSpeedType switch + { + MareConfiguration.Models.DownloadSpeeds.Bps => limit, + MareConfiguration.Models.DownloadSpeeds.KBps => limit * 1024, + MareConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024, + _ => limit, + }; + var dividedLimit = limit / (CurrentlyUsedDownloadSlots == 0 ? 1 : CurrentlyUsedDownloadSlots); + return dividedLimit == 0 ? 1 : dividedLimit; } private async Task SendRequestInternalAsync(HttpRequestMessage requestMessage, diff --git a/MareSynchronos/WebAPI/Files/ThrottledStream.cs b/MareSynchronos/WebAPI/Files/ThrottledStream.cs new file mode 100644 index 0000000..0e50a68 --- /dev/null +++ b/MareSynchronos/WebAPI/Files/ThrottledStream.cs @@ -0,0 +1,217 @@ +using Microsoft.Extensions.Logging; + +namespace MareSynchronos.WebAPI.Files +{ + /// + /// Class for streaming data with throttling support. + /// Borrowed from https://github.com/bezzad/Downloader + /// + internal class ThrottledStream : Stream + { + public static long Infinite => long.MaxValue; + private readonly Stream _baseStream; + private long _bandwidthLimit; + private Bandwidth _bandwidth; + private CancellationTokenSource _bandwidthChangeTokenSource = new CancellationTokenSource(); + + /// + /// Initializes a new instance of the class. + /// + /// The base stream. + /// The maximum bytes per second that can be transferred through the base stream. + /// Thrown when is a null reference. + /// Thrown when is a negative value. + public ThrottledStream(Stream baseStream, long bandwidthLimit) + { + if (bandwidthLimit < 0) + { + throw new ArgumentOutOfRangeException(nameof(bandwidthLimit), + bandwidthLimit, "The maximum number of bytes per second can't be negative."); + } + + _baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream)); + BandwidthLimit = bandwidthLimit; + } + + /// + /// Bandwidth Limit (in B/s) + /// + /// The maximum bytes per second. + public long BandwidthLimit + { + get => _bandwidthLimit; + set + { + if (_bandwidthLimit == value) return; + _bandwidthLimit = value <= 0 ? Infinite : value; + _bandwidth ??= new Bandwidth(); + _bandwidth.BandwidthLimit = _bandwidthLimit; + _bandwidthChangeTokenSource.Cancel(); + _bandwidthChangeTokenSource.Dispose(); + _bandwidthChangeTokenSource = new(); + } + } + + /// + public override bool CanRead => _baseStream.CanRead; + + /// + public override bool CanSeek => _baseStream.CanSeek; + + /// + public override bool CanWrite => _baseStream.CanWrite; + + /// + public override long Length => _baseStream.Length; + + /// + public override long Position + { + get => _baseStream.Position; + set => _baseStream.Position = value; + } + + /// + public override void Flush() + { + _baseStream.Flush(); + } + + /// + public override long Seek(long offset, SeekOrigin origin) + { + return _baseStream.Seek(offset, origin); + } + + /// + public override void SetLength(long value) + { + _baseStream.SetLength(value); + } + + /// + public override int Read(byte[] buffer, int offset, int count) + { + Throttle(count).Wait(); + return _baseStream.Read(buffer, offset, count); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, + CancellationToken cancellationToken) + { + await Throttle(count, cancellationToken).ConfigureAwait(false); + return await _baseStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + + /// + public override void Write(byte[] buffer, int offset, int count) + { + Throttle(count).Wait(); + _baseStream.Write(buffer, offset, count); + } + + /// + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await Throttle(count, cancellationToken).ConfigureAwait(false); + await _baseStream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); + } + + public override void Close() + { + _baseStream.Close(); + base.Close(); + } + + private async Task Throttle(int transmissionVolume, CancellationToken token = default) + { + // Make sure the buffer isn't empty. + if (BandwidthLimit > 0 && transmissionVolume > 0) + { + // Calculate the time to sleep. + _bandwidth.CalculateSpeed(transmissionVolume); + await Sleep(_bandwidth.PopSpeedRetrieveTime(), token).ConfigureAwait(false); + } + } + + private async Task Sleep(int time, CancellationToken token = default) + { + try + { + if (time > 0) + { + var bandWidthtoken = _bandwidthChangeTokenSource.Token; + var linked = CancellationTokenSource.CreateLinkedTokenSource(token, bandWidthtoken).Token; + await Task.Delay(time, linked).ConfigureAwait(false); + } + } + catch (TaskCanceledException) + { + // ignore + } + } + + /// + public override string ToString() + { + return _baseStream.ToString(); + } + + private class Bandwidth + { + private long _count; + private int _lastSecondCheckpoint; + private long _lastTransferredBytesCount; + private int _speedRetrieveTime; + public double Speed { get; private set; } + public double AverageSpeed { get; private set; } + public long BandwidthLimit { get; set; } + + public Bandwidth() + { + BandwidthLimit = long.MaxValue; + Reset(); + } + + public void CalculateSpeed(long receivedBytesCount) + { + int elapsedTime = Environment.TickCount - _lastSecondCheckpoint + 1; + receivedBytesCount = Interlocked.Add(ref _lastTransferredBytesCount, receivedBytesCount); + double momentSpeed = receivedBytesCount * 1000 / elapsedTime; // B/s + + if (1000 < elapsedTime) + { + Speed = momentSpeed; + AverageSpeed = ((AverageSpeed * _count) + Speed) / (_count + 1); + _count++; + SecondCheckpoint(); + } + + if (momentSpeed >= BandwidthLimit) + { + var expectedTime = receivedBytesCount * 1000 / BandwidthLimit; + Interlocked.Add(ref _speedRetrieveTime, (int)expectedTime - elapsedTime); + } + } + + public int PopSpeedRetrieveTime() + { + return Interlocked.Exchange(ref _speedRetrieveTime, 0); + } + + public void Reset() + { + SecondCheckpoint(); + _count = 0; + Speed = 0; + AverageSpeed = 0; + } + + private void SecondCheckpoint() + { + Interlocked.Exchange(ref _lastSecondCheckpoint, Environment.TickCount); + Interlocked.Exchange(ref _lastTransferredBytesCount, 0); + } + } + } +}