ReadAsync/WriteAsync overloads

This commit is contained in:
Loporrit
2025-06-30 15:28:07 +00:00
parent 660319d74b
commit c8e988abc5
3 changed files with 51 additions and 11 deletions

View File

@@ -41,8 +41,7 @@ public class HashingStream : Stream
public override int Read(byte[] buffer, int offset, int count) public override int Read(byte[] buffer, int offset, int count)
{ {
if (_finished) ObjectDisposedException.ThrowIf(_finished, this);
throw new ObjectDisposedException("HashingStream");
int n = _stream.Read(buffer, offset, count); int n = _stream.Read(buffer, offset, count);
if (n > 0) if (n > 0)
_hashAlgo.TransformBlock(buffer, offset, n, buffer, offset); _hashAlgo.TransformBlock(buffer, offset, n, buffer, offset);
@@ -56,15 +55,13 @@ public class HashingStream : Stream
public override void SetLength(long value) public override void SetLength(long value)
{ {
if (_finished) ObjectDisposedException.ThrowIf(_finished, this);
throw new ObjectDisposedException("HashingStream");
_stream.SetLength(value); _stream.SetLength(value);
} }
public override void Write(byte[] buffer, int offset, int count) public override void Write(byte[] buffer, int offset, int count)
{ {
if (_finished) ObjectDisposedException.ThrowIf(_finished, this);
throw new ObjectDisposedException("HashingStream");
_stream.Write(buffer, offset, count); _stream.Write(buffer, offset, count);
string x = new(System.Text.Encoding.ASCII.GetChars(buffer.AsSpan().Slice(offset, count).ToArray())); string x = new(System.Text.Encoding.ASCII.GetChars(buffer.AsSpan().Slice(offset, count).ToArray()));
_hashAlgo.TransformBlock(buffer, offset, count, buffer, offset); _hashAlgo.TransformBlock(buffer, offset, count, buffer, offset);
@@ -73,10 +70,10 @@ public class HashingStream : Stream
public byte[] Finish() public byte[] Finish()
{ {
if (_finished) if (_finished)
return _hashAlgo.Hash; return _hashAlgo.Hash!;
_hashAlgo.TransformFinalBlock(Array.Empty<byte>(), 0, 0); _hashAlgo.TransformFinalBlock(Array.Empty<byte>(), 0, 0);
if (DisposeUnderlying) if (DisposeUnderlying)
_stream.Dispose(); _stream.Dispose();
return _hashAlgo.Hash; return _hashAlgo.Hash!;
} }
} }

View File

@@ -59,7 +59,21 @@ public class LimitedStream : Stream
if (count > remainder) if (count > remainder)
count = remainder; count = remainder;
#pragma warning disable CA1835
int n = await _stream.ReadAsync(buffer, offset, count, cancellationToken); int n = await _stream.ReadAsync(buffer, offset, count, cancellationToken);
#pragma warning restore CA1835
_estimatedPosition += n;
return n;
}
public async override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
int remainder = (int)long.Clamp(MaxPosition - _estimatedPosition, 0, int.MaxValue);
if (buffer.Length > remainder)
buffer = buffer[..remainder];
int n = await _stream.ReadAsync(buffer, cancellationToken);
_estimatedPosition += n; _estimatedPosition += n;
return n; return n;
} }
@@ -94,7 +108,20 @@ public class LimitedStream : Stream
if (count > remainder) if (count > remainder)
count = remainder; count = remainder;
#pragma warning disable CA1835
await _stream.WriteAsync(buffer, offset, count, cancellationToken); await _stream.WriteAsync(buffer, offset, count, cancellationToken);
#pragma warning restore CA1835
_estimatedPosition += count; _estimatedPosition += count;
} }
public async override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
int remainder = (int)long.Clamp(MaxPosition - _estimatedPosition, 0, int.MaxValue);
if (buffer.Length > remainder)
buffer = buffer[..remainder];
await _stream.WriteAsync(buffer, cancellationToken);
_estimatedPosition += buffer.Length;
}
} }

View File

@@ -9,8 +9,8 @@
public static long Infinite => long.MaxValue; public static long Infinite => long.MaxValue;
private readonly Stream _baseStream; private readonly Stream _baseStream;
private long _bandwidthLimit; private long _bandwidthLimit;
private Bandwidth _bandwidth; private readonly Bandwidth _bandwidth = new();
private CancellationTokenSource _bandwidthChangeTokenSource = new CancellationTokenSource(); private CancellationTokenSource _bandwidthChangeTokenSource = new();
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="T:ThrottledStream" /> class. /// Initializes a new instance of the <see cref="T:ThrottledStream" /> class.
@@ -42,7 +42,6 @@
{ {
if (_bandwidthLimit == value) return; if (_bandwidthLimit == value) return;
_bandwidthLimit = value <= 0 ? Infinite : value; _bandwidthLimit = value <= 0 ? Infinite : value;
_bandwidth ??= new Bandwidth();
_bandwidth.BandwidthLimit = _bandwidthLimit; _bandwidth.BandwidthLimit = _bandwidthLimit;
_bandwidthChangeTokenSource.Cancel(); _bandwidthChangeTokenSource.Cancel();
_bandwidthChangeTokenSource.Dispose(); _bandwidthChangeTokenSource.Dispose();
@@ -98,7 +97,15 @@
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
await Throttle(count, cancellationToken).ConfigureAwait(false); await Throttle(count, cancellationToken).ConfigureAwait(false);
#pragma warning disable CA1835
return await _baseStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); return await _baseStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
#pragma warning restore CA1835
}
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
await Throttle(buffer.Length, cancellationToken).ConfigureAwait(false);
return await _baseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
} }
/// <inheritdoc /> /// <inheritdoc />
@@ -112,7 +119,16 @@
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{ {
await Throttle(count, cancellationToken).ConfigureAwait(false); await Throttle(count, cancellationToken).ConfigureAwait(false);
#pragma warning disable CA1835
await _baseStream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false); await _baseStream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
#pragma warning restore CA1835
}
/// <inheritdoc />
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
await Throttle(buffer.Length, cancellationToken).ConfigureAwait(false);
await _baseStream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
} }
public override void Close() public override void Close()