Use streamable compression

This commit is contained in:
Loporrit
2023-12-18 12:27:22 +00:00
parent 14f0b10244
commit c843af1470
9 changed files with 214 additions and 36 deletions

View File

@@ -1,11 +1,12 @@
using Dalamud.Utility;
using LZ4;
using K4os.Compression.LZ4.Streams;
using MareSynchronos.API.Data;
using MareSynchronos.API.Dto.Files;
using MareSynchronos.API.Routes;
using MareSynchronos.FileCache;
using MareSynchronos.PlayerData.Handlers;
using MareSynchronos.Services.Mediator;
using MareSynchronos.Utils;
using MareSynchronos.WebAPI.Files.Models;
using Microsoft.Extensions.Logging;
using System.Net;
@@ -49,14 +50,6 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
public bool IsDownloading => !CurrentDownloads.Any();
public static void MungeBuffer(Span<byte> buffer)
{
for (int i = 0; i < buffer.Length; ++i)
{
buffer[i] ^= 42;
}
}
public void CancelDownload()
{
CurrentDownloads.Clear();
@@ -95,27 +88,27 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
base.Dispose(disposing);
}
private static byte MungeByte(int byteOrEof)
private static byte ConvertReadByte(int byteOrEof)
{
if (byteOrEof == -1)
{
throw new EndOfStreamException();
}
return (byte)(byteOrEof ^ 42);
return (byte)byteOrEof;
}
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
{
List<char> hashName = [];
List<char> fileLength = [];
var separator = (char)MungeByte(fileBlockStream.ReadByte());
var separator = (char)ConvertReadByte(fileBlockStream.ReadByte());
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
bool readHash = false;
while (true)
{
var readChar = (char)MungeByte(fileBlockStream.ReadByte());
var readChar = (char)ConvertReadByte(fileBlockStream.ReadByte());
if (readChar == ':')
{
readHash = true;
@@ -172,8 +165,6 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
{
ct.ThrowIfCancellationRequested();
MungeBuffer(buffer.AsSpan(0, bytesRead));
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
progress.Report(bytesRead);
@@ -313,13 +304,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogDebug("Found file {file} with length {le}, decompressing download", fileHash, fileLengthBytes);
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
byte[] compressedFileContent = new byte[fileLengthBytes];
_ = await fileBlockStream.ReadAsync(compressedFileContent, token).ConfigureAwait(false);
MungeBuffer(compressedFileContent);
using var decompressedFile = new MemoryStream(64 * 1024);
using var innerFileStream = new LimitedStream(fileBlockStream, fileLengthBytes);
innerFileStream.DisposeUnderlying = false;
using var decStream = LZ4Stream.Decode(innerFileStream, 0, true);
await decStream.CopyToAsync(decompressedFile, token);
var decompressedFile = LZ4Codec.Unwrap(compressedFileContent);
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, token).ConfigureAwait(false);
await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile.ToArray(), token).ConfigureAwait(false);
PersistFileToStorage(fileHash, filePath);
}

View File

@@ -11,6 +11,7 @@ using Microsoft.Extensions.Logging;
using System.Net.Http.Headers;
using System.Net.Http.Json;
namespace MareSynchronos.WebAPI.Files;
public sealed class FileUploadManager : DisposableMediatorSubscriberBase
@@ -164,10 +165,7 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase
private async Task UploadFileStream(byte[] compressedFile, string fileHash, bool munged, CancellationToken uploadToken)
{
if (munged)
{
throw new NotImplementedException();
FileDownloadManager.MungeBuffer(compressedFile.AsSpan());
}
using var ms = new MemoryStream(compressedFile);
@@ -234,10 +232,10 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase
{
Logger.LogDebug("[{hash}] Compressing", file);
var data = await _fileDbManager.GetCompressedFileData(file.Hash, uploadToken).ConfigureAwait(false);
CurrentUploads.Single(e => string.Equals(e.Hash, data.Item1, StringComparison.Ordinal)).Total = data.Item2.Length;
Logger.LogDebug("[{hash}] Starting upload for {filePath}", data.Item1, _fileDbManager.GetFileCacheByHash(data.Item1)!.ResolvedFilepath);
CurrentUploads.Single(e => string.Equals(e.Hash, file.Hash, StringComparison.Ordinal)).Total = data.Length;
Logger.LogDebug("[{hash}] Starting upload for {filePath}", file.Hash, _fileDbManager.GetFileCacheByHash(file.Hash)!.ResolvedFilepath);
await uploadTask.ConfigureAwait(false);
uploadTask = UploadFile(data.Item2, file.Hash, uploadToken);
uploadTask = UploadFile(data, file.Hash, uploadToken);
uploadToken.ThrowIfCancellationRequested();
}