diff --git a/MareSynchronos/FileCache/FileCacheManager.cs b/MareSynchronos/FileCache/FileCacheManager.cs index 853cc05..7648b0e 100644 --- a/MareSynchronos/FileCache/FileCacheManager.cs +++ b/MareSynchronos/FileCache/FileCacheManager.cs @@ -1,4 +1,4 @@ -using LZ4; +using K4os.Compression.LZ4.Streams; using MareSynchronos.Interop; using MareSynchronos.MareConfiguration; using MareSynchronos.Services.Mediator; @@ -221,11 +221,26 @@ public sealed class FileCacheManager : IDisposable return Path.Combine(_configService.Current.CacheFolder, hash + "." + extension); } - public async Task<(string, byte[])> GetCompressedFileData(string fileHash, CancellationToken uploadToken) + public async Task GetCompressedFileLength(string fileHash, CancellationToken uploadToken) { var fileCache = GetFileCacheByHash(fileHash)!.ResolvedFilepath; - return (fileHash, LZ4Codec.WrapHC(await File.ReadAllBytesAsync(fileCache, uploadToken).ConfigureAwait(false), 0, - (int)new FileInfo(fileCache).Length)); + using var fs = File.OpenRead(fileCache); + var cs = new CountedStream(Stream.Null); + using var encstream = LZ4Stream.Encode(cs, new LZ4EncoderSettings(){CompressionLevel=K4os.Compression.LZ4.LZ4Level.L09_HC}); + await fs.CopyToAsync(encstream, uploadToken).ConfigureAwait(false); + encstream.Close(); + return uploadToken.IsCancellationRequested ? 0 : cs.BytesWritten; + } + + public async Task GetCompressedFileData(string fileHash, CancellationToken uploadToken) + { + var fileCache = GetFileCacheByHash(fileHash)!.ResolvedFilepath; + using var fs = File.OpenRead(fileCache); + var ms = new MemoryStream(64 * 1024); + using var encstream = LZ4Stream.Encode(ms, new LZ4EncoderSettings(){CompressionLevel=K4os.Compression.LZ4.LZ4Level.L09_HC}); + await fs.CopyToAsync(encstream, uploadToken).ConfigureAwait(false); + encstream.Close(); + return ms.ToArray(); } public FileCacheEntity? GetFileCacheByHash(string hash) diff --git a/MareSynchronos/MareSynchronos.csproj b/MareSynchronos/MareSynchronos.csproj index e0a2bac..49f12d0 100644 --- a/MareSynchronos/MareSynchronos.csproj +++ b/MareSynchronos/MareSynchronos.csproj @@ -33,7 +33,8 @@ - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/MareSynchronos/PlayerData/Export/MareCharaFileManager.cs b/MareSynchronos/PlayerData/Export/MareCharaFileManager.cs index 6090831..7b05a40 100644 --- a/MareSynchronos/PlayerData/Export/MareCharaFileManager.cs +++ b/MareSynchronos/PlayerData/Export/MareCharaFileManager.cs @@ -1,5 +1,5 @@ using Dalamud.Game.ClientState.Objects.Types; -using LZ4; +using K4os.Compression.LZ4.Legacy; using MareSynchronos.API.Data.Enum; using MareSynchronos.FileCache; using MareSynchronos.Interop; diff --git a/MareSynchronos/Services/CharacterAnalyzer.cs b/MareSynchronos/Services/CharacterAnalyzer.cs index 7097052..ff519bc 100644 --- a/MareSynchronos/Services/CharacterAnalyzer.cs +++ b/MareSynchronos/Services/CharacterAnalyzer.cs @@ -181,16 +181,16 @@ public sealed class CharacterAnalyzer : MediatorSubscriberBase, IDisposable public bool IsComputed => OriginalSize > 0 && CompressedSize > 0; public async Task ComputeSizes(FileCacheManager fileCacheManager, CancellationToken token) { - var compressedsize = await fileCacheManager.GetCompressedFileData(Hash, token).ConfigureAwait(false); + var compressedsize = await fileCacheManager.GetCompressedFileLength(Hash, token).ConfigureAwait(false); var normalSize = new FileInfo(FilePaths[0]).Length; var entries = fileCacheManager.GetAllFileCachesByHash(Hash); foreach (var entry in entries) { entry.Size = normalSize; - entry.CompressedSize = compressedsize.Item2.LongLength; + entry.CompressedSize = compressedsize; } OriginalSize = normalSize; - CompressedSize = compressedsize.Item2.LongLength; + CompressedSize = compressedsize; } public long OriginalSize { get; private set; } = OriginalSize; public long CompressedSize { get; private set; } = CompressedSize; diff --git a/MareSynchronos/UI/CompactUI.cs b/MareSynchronos/UI/CompactUI.cs index a09bdac..f3d3d5a 100644 --- a/MareSynchronos/UI/CompactUI.cs +++ b/MareSynchronos/UI/CompactUI.cs @@ -76,7 +76,7 @@ public class CompactUi : WindowMediatorSubscriberBase #if DEBUG string dev = "Dev Build"; var ver = Assembly.GetExecutingAssembly().GetName().Version!; - WindowName = $"Loporrit Sync {dev} ({ver.Major}.{ver.Minor}.{ver.Build}-lop{ver.Revision})###LoporritSyncMainUI"; + WindowName = $"Loporrit Sync {dev} ({ver.Major}.{ver.Minor}.{ver.Build}-lop{ver.Revision})###LoporritSyncMainUIDev"; Toggle(); #else var ver = Assembly.GetExecutingAssembly().GetName().Version; diff --git a/MareSynchronos/Utils/CountedStream.cs b/MareSynchronos/Utils/CountedStream.cs new file mode 100644 index 0000000..0f5e2e6 --- /dev/null +++ b/MareSynchronos/Utils/CountedStream.cs @@ -0,0 +1,72 @@ +namespace MareSynchronos.Utils; + +// Counts the number of bytes read/written to an underlying stream +public class CountedStream : Stream +{ + private readonly Stream _stream; + public long BytesRead { get; private set; } + public long BytesWritten { get; private set; } + public bool DisposeUnderlying = true; + + public Stream UnderlyingStream { get => _stream; } + + public CountedStream(Stream underlyingStream) + { + _stream = underlyingStream; + } + + protected override void Dispose(bool disposing) + { + if (!DisposeUnderlying) + return; + _stream.Dispose(); + } + + public override bool CanRead => _stream.CanRead; + public override bool CanSeek => _stream.CanSeek; + public override bool CanWrite => _stream.CanWrite; + public override long Length => _stream.Length; + + public override long Position { get => _stream.Position; set => _stream.Position = value; } + + public override void Flush() + { + _stream.Flush(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + int n = _stream.Read(buffer, offset, count); + BytesRead += n; + return n; + } + + public async override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + int n = await _stream.ReadAsync(buffer, offset, count, cancellationToken); + BytesRead += n; + return n; + } + + public override long Seek(long offset, SeekOrigin origin) + { + return _stream.Seek(offset, origin); + } + + public override void SetLength(long value) + { + _stream.SetLength(value); + } + + public override void Write(byte[] buffer, int offset, int count) + { + _stream.Write(buffer, offset, count); + BytesWritten += count; + } + + public async override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _stream.WriteAsync(buffer, offset, count, cancellationToken); + BytesWritten += count; + } +} diff --git a/MareSynchronos/Utils/LimitedStream.cs b/MareSynchronos/Utils/LimitedStream.cs new file mode 100644 index 0000000..6d62e26 --- /dev/null +++ b/MareSynchronos/Utils/LimitedStream.cs @@ -0,0 +1,100 @@ +namespace MareSynchronos.Utils; + +// Limits the number of bytes read/written to an underlying stream +public class LimitedStream : Stream +{ + private readonly Stream _stream; + public long _estimatedPosition = 0; + public long MaxPosition { get; private init; } + public bool DisposeUnderlying = true; + + public Stream UnderlyingStream { get => _stream; } + + public LimitedStream(Stream underlyingStream, long byteLimit) + { + _stream = underlyingStream; + try + { + _estimatedPosition = Position; + } + catch { } + MaxPosition = _estimatedPosition + byteLimit; + } + + protected override void Dispose(bool disposing) + { + if (!DisposeUnderlying) + return; + _stream.Dispose(); + } + + public override bool CanRead => _stream.CanRead; + public override bool CanSeek => _stream.CanSeek; + public override bool CanWrite => _stream.CanWrite; + public override long Length => _stream.Length; + + public override long Position { get => _stream.Position; set => _stream.Position = _estimatedPosition = value; } + + public override void Flush() + { + _stream.Flush(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + int remainder = (int)long.Clamp(MaxPosition - _estimatedPosition, 0, int.MaxValue); + + if (count > remainder) + count = remainder; + + int n = _stream.Read(buffer, offset, count); + _estimatedPosition += n; + return n; + } + + public async override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + int remainder = (int)long.Clamp(MaxPosition - _estimatedPosition, 0, int.MaxValue); + + if (count > remainder) + count = remainder; + + int n = await _stream.ReadAsync(buffer, offset, count, cancellationToken); + _estimatedPosition += n; + return n; + } + + public override long Seek(long offset, SeekOrigin origin) + { + long result = _stream.Seek(offset, origin); + _estimatedPosition = result; + return result; + } + + public override void SetLength(long value) + { + _stream.SetLength(value); + } + + public override void Write(byte[] buffer, int offset, int count) + { + int remainder = (int)long.Clamp(MaxPosition - _estimatedPosition, 0, int.MaxValue); + + if (count > remainder) + count = remainder; + + _stream.Write(buffer, offset, count); + _estimatedPosition += count; + } + + public async override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + int remainder = (int)long.Clamp(MaxPosition - _estimatedPosition, 0, int.MaxValue); + + if (count > remainder) + count = remainder; + + await _stream.WriteAsync(buffer, offset, count, cancellationToken); + _estimatedPosition += count; + } +} diff --git a/MareSynchronos/WebAPI/Files/FileDownloadManager.cs b/MareSynchronos/WebAPI/Files/FileDownloadManager.cs index de99f3f..75e6d6a 100644 --- a/MareSynchronos/WebAPI/Files/FileDownloadManager.cs +++ b/MareSynchronos/WebAPI/Files/FileDownloadManager.cs @@ -1,11 +1,12 @@ using Dalamud.Utility; -using LZ4; +using K4os.Compression.LZ4.Streams; using MareSynchronos.API.Data; using MareSynchronos.API.Dto.Files; using MareSynchronos.API.Routes; using MareSynchronos.FileCache; using MareSynchronos.PlayerData.Handlers; using MareSynchronos.Services.Mediator; +using MareSynchronos.Utils; using MareSynchronos.WebAPI.Files.Models; using Microsoft.Extensions.Logging; using System.Net; @@ -49,14 +50,6 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase public bool IsDownloading => !CurrentDownloads.Any(); - public static void MungeBuffer(Span buffer) - { - for (int i = 0; i < buffer.Length; ++i) - { - buffer[i] ^= 42; - } - } - public void CancelDownload() { CurrentDownloads.Clear(); @@ -95,27 +88,27 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase base.Dispose(disposing); } - private static byte MungeByte(int byteOrEof) + private static byte ConvertReadByte(int byteOrEof) { if (byteOrEof == -1) { throw new EndOfStreamException(); } - return (byte)(byteOrEof ^ 42); + return (byte)byteOrEof; } private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream) { List hashName = []; List fileLength = []; - var separator = (char)MungeByte(fileBlockStream.ReadByte()); + var separator = (char)ConvertReadByte(fileBlockStream.ReadByte()); if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #"); bool readHash = false; while (true) { - var readChar = (char)MungeByte(fileBlockStream.ReadByte()); + var readChar = (char)ConvertReadByte(fileBlockStream.ReadByte()); if (readChar == ':') { readHash = true; @@ -172,8 +165,6 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { ct.ThrowIfCancellationRequested(); - MungeBuffer(buffer.AsSpan(0, bytesRead)); - await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false); progress.Report(bytesRead); @@ -313,13 +304,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase Logger.LogDebug("Found file {file} with length {le}, decompressing download", fileHash, fileLengthBytes); var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1]; - byte[] compressedFileContent = new byte[fileLengthBytes]; - _ = await fileBlockStream.ReadAsync(compressedFileContent, token).ConfigureAwait(false); - MungeBuffer(compressedFileContent); + using var decompressedFile = new MemoryStream(64 * 1024); + using var innerFileStream = new LimitedStream(fileBlockStream, fileLengthBytes); + innerFileStream.DisposeUnderlying = false; + using var decStream = LZ4Stream.Decode(innerFileStream, 0, true); + await decStream.CopyToAsync(decompressedFile, token); - var decompressedFile = LZ4Codec.Unwrap(compressedFileContent); var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension); - await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, token).ConfigureAwait(false); + await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile.ToArray(), token).ConfigureAwait(false); PersistFileToStorage(fileHash, filePath); } diff --git a/MareSynchronos/WebAPI/Files/FileUploadManager.cs b/MareSynchronos/WebAPI/Files/FileUploadManager.cs index 68f3cbe..7d371e4 100644 --- a/MareSynchronos/WebAPI/Files/FileUploadManager.cs +++ b/MareSynchronos/WebAPI/Files/FileUploadManager.cs @@ -11,6 +11,7 @@ using Microsoft.Extensions.Logging; using System.Net.Http.Headers; using System.Net.Http.Json; + namespace MareSynchronos.WebAPI.Files; public sealed class FileUploadManager : DisposableMediatorSubscriberBase @@ -164,10 +165,7 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase private async Task UploadFileStream(byte[] compressedFile, string fileHash, bool munged, CancellationToken uploadToken) { if (munged) - { throw new NotImplementedException(); - FileDownloadManager.MungeBuffer(compressedFile.AsSpan()); - } using var ms = new MemoryStream(compressedFile); @@ -234,10 +232,10 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase { Logger.LogDebug("[{hash}] Compressing", file); var data = await _fileDbManager.GetCompressedFileData(file.Hash, uploadToken).ConfigureAwait(false); - CurrentUploads.Single(e => string.Equals(e.Hash, data.Item1, StringComparison.Ordinal)).Total = data.Item2.Length; - Logger.LogDebug("[{hash}] Starting upload for {filePath}", data.Item1, _fileDbManager.GetFileCacheByHash(data.Item1)!.ResolvedFilepath); + CurrentUploads.Single(e => string.Equals(e.Hash, file.Hash, StringComparison.Ordinal)).Total = data.Length; + Logger.LogDebug("[{hash}] Starting upload for {filePath}", file.Hash, _fileDbManager.GetFileCacheByHash(file.Hash)!.ResolvedFilepath); await uploadTask.ConfigureAwait(false); - uploadTask = UploadFile(data.Item2, file.Hash, uploadToken); + uploadTask = UploadFile(data, file.Hash, uploadToken); uploadToken.ThrowIfCancellationRequested(); }