diff --git a/MareSynchronos/Utils/HashingStream.cs b/MareSynchronos/Utils/HashingStream.cs index 7b89740..fe35f90 100644 --- a/MareSynchronos/Utils/HashingStream.cs +++ b/MareSynchronos/Utils/HashingStream.cs @@ -41,8 +41,7 @@ public class HashingStream : Stream public override int Read(byte[] buffer, int offset, int count) { - if (_finished) - throw new ObjectDisposedException("HashingStream"); + ObjectDisposedException.ThrowIf(_finished, this); int n = _stream.Read(buffer, offset, count); if (n > 0) _hashAlgo.TransformBlock(buffer, offset, n, buffer, offset); @@ -56,15 +55,13 @@ public class HashingStream : Stream public override void SetLength(long value) { - if (_finished) - throw new ObjectDisposedException("HashingStream"); + ObjectDisposedException.ThrowIf(_finished, this); _stream.SetLength(value); } public override void Write(byte[] buffer, int offset, int count) { - if (_finished) - throw new ObjectDisposedException("HashingStream"); + ObjectDisposedException.ThrowIf(_finished, this); _stream.Write(buffer, offset, count); string x = new(System.Text.Encoding.ASCII.GetChars(buffer.AsSpan().Slice(offset, count).ToArray())); _hashAlgo.TransformBlock(buffer, offset, count, buffer, offset); @@ -73,10 +70,10 @@ public class HashingStream : Stream public byte[] Finish() { if (_finished) - return _hashAlgo.Hash; + return _hashAlgo.Hash!; _hashAlgo.TransformFinalBlock(Array.Empty(), 0, 0); if (DisposeUnderlying) _stream.Dispose(); - return _hashAlgo.Hash; + return _hashAlgo.Hash!; } } diff --git a/MareSynchronos/Utils/LimitedStream.cs b/MareSynchronos/Utils/LimitedStream.cs index daba37a..1561621 100644 --- a/MareSynchronos/Utils/LimitedStream.cs +++ b/MareSynchronos/Utils/LimitedStream.cs @@ -59,7 +59,21 @@ public class LimitedStream : Stream if (count > remainder) count = remainder; +#pragma warning disable CA1835 int n = await _stream.ReadAsync(buffer, offset, count, cancellationToken); +#pragma warning restore CA1835 + _estimatedPosition += n; + return n; + } + + public async override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) + { + int remainder = (int)long.Clamp(MaxPosition - _estimatedPosition, 0, int.MaxValue); + + if (buffer.Length > remainder) + buffer = buffer[..remainder]; + + int n = await _stream.ReadAsync(buffer, cancellationToken); _estimatedPosition += n; return n; } @@ -94,7 +108,20 @@ public class LimitedStream : Stream if (count > remainder) count = remainder; +#pragma warning disable CA1835 await _stream.WriteAsync(buffer, offset, count, cancellationToken); +#pragma warning restore CA1835 _estimatedPosition += count; } + + public async override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + int remainder = (int)long.Clamp(MaxPosition - _estimatedPosition, 0, int.MaxValue); + + if (buffer.Length > remainder) + buffer = buffer[..remainder]; + + await _stream.WriteAsync(buffer, cancellationToken); + _estimatedPosition += buffer.Length; + } } diff --git a/MareSynchronos/WebAPI/Files/ThrottledStream.cs b/MareSynchronos/WebAPI/Files/ThrottledStream.cs index 8ff6fec..b0ceeda 100644 --- a/MareSynchronos/WebAPI/Files/ThrottledStream.cs +++ b/MareSynchronos/WebAPI/Files/ThrottledStream.cs @@ -9,8 +9,8 @@ public static long Infinite => long.MaxValue; private readonly Stream _baseStream; private long _bandwidthLimit; - private Bandwidth _bandwidth; - private CancellationTokenSource _bandwidthChangeTokenSource = new CancellationTokenSource(); + private readonly Bandwidth _bandwidth = new(); + private CancellationTokenSource _bandwidthChangeTokenSource = new(); /// /// Initializes a new instance of the class. @@ -42,7 +42,6 @@ { if (_bandwidthLimit == value) return; _bandwidthLimit = value <= 0 ? Infinite : value; - _bandwidth ??= new Bandwidth(); _bandwidth.BandwidthLimit = _bandwidthLimit; _bandwidthChangeTokenSource.Cancel(); _bandwidthChangeTokenSource.Dispose(); @@ -98,7 +97,15 @@ CancellationToken cancellationToken) { await Throttle(count, cancellationToken).ConfigureAwait(false); +#pragma warning disable CA1835 return await _baseStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); +#pragma warning restore CA1835 + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken) + { + await Throttle(buffer.Length, cancellationToken).ConfigureAwait(false); + return await _baseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); } /// @@ -112,7 +119,16 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { await Throttle(count, cancellationToken).ConfigureAwait(false); +#pragma warning disable CA1835 await _baseStream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); +#pragma warning restore CA1835 + } + + /// + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + await Throttle(buffer.Length, cancellationToken).ConfigureAwait(false); + await _baseStream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); } public override void Close()