Use streamable compression (needs file cache clear)

This commit is contained in:
Loporrit
2023-12-18 12:51:36 +00:00
parent 0707d3eb54
commit 8cf4f50091
7 changed files with 297 additions and 141 deletions

View File

@@ -1,4 +1,4 @@
using LZ4;
using K4os.Compression.LZ4.Streams;
using MareSynchronos.API.Dto.Files;
using MareSynchronos.API.Routes;
using MareSynchronos.API.SignalR;
@@ -183,115 +183,68 @@ public class ServerFilesController : ControllerBase
return Ok();
}
// copy the request body to memory
using var compressedFileStream = new MemoryStream();
await Request.Body.CopyToAsync(compressedFileStream, requestAborted).ConfigureAwait(false);
// decompress and copy the decompressed stream to memory
var data = LZ4Codec.Unwrap(compressedFileStream.ToArray());
// reset streams
compressedFileStream.Seek(0, SeekOrigin.Begin);
// compute hash to verify
var hashString = BitConverter.ToString(SHA1.HashData(data))
.Replace("-", "", StringComparison.Ordinal).ToUpperInvariant();
if (!string.Equals(hashString, hash, StringComparison.Ordinal))
throw new InvalidOperationException($"Hash does not match file, computed: {hashString}, expected: {hash}");
// save file
var path = FilePathUtil.GetFilePath(_basePath, hash);
using var fileStream = new FileStream(path, FileMode.Create);
await compressedFileStream.CopyToAsync(fileStream).ConfigureAwait(false);
var tmpPath = path + ".tmp";
long compressedSize = -1;
// update on db
await _mareDbContext.Files.AddAsync(new FileCache()
try
{
Hash = hash,
UploadDate = DateTime.UtcNow,
UploaderUID = MareUser,
Size = compressedFileStream.Length,
Uploaded = true
}).ConfigureAwait(false);
await _mareDbContext.SaveChangesAsync().ConfigureAwait(false);
// Write incoming file to a temporary file while also hashing the decompressed content
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotal, 1);
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotalSize, compressedFileStream.Length);
// Stream flow diagram:
// Request.Body ==> (Tee) ==> FileStream
// ==> CountedStream ==> LZ4DecoderStream ==> HashingStream ==> Stream.Null
_fileUploadLocks.TryRemove(hash, out _);
// Reading via TeeStream causes the request body to be copied to tmpPath
using var tmpFileStream = new FileStream(tmpPath, FileMode.Create);
using var teeStream = new TeeStream(Request.Body, tmpFileStream);
teeStream.DisposeUnderlying = false;
// Read via CountedStream to count the number of compressed bytes
using var countStream = new CountedStream(teeStream);
countStream.DisposeUnderlying = false;
return Ok();
}
catch (Exception e)
{
_logger.LogError(e, "Error during file upload");
return BadRequest();
}
finally
{
fileLock.Release();
}
}
// The decompressed file content is read through LZ4DecoderStream, and written out to HashingStream
using var decStream = LZ4Stream.Decode(countStream, extraMemory: 0, leaveOpen: true);
// HashingStream simply hashes the decompressed bytes without writing them anywhere
using var hashStream = new HashingStream(Stream.Null, SHA1.Create());
hashStream.DisposeUnderlying = false;
[HttpPost(MareFiles.ServerFiles_UploadRaw + "/{hash}")]
[RequestSizeLimit(200 * 1024 * 1024)]
public async Task<IActionResult> UploadFileRaw(string hash, CancellationToken requestAborted)
{
_logger.LogInformation("{user} uploading raw file {file}", MareUser, hash);
hash = hash.ToUpperInvariant();
var existingFile = await _mareDbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
if (existingFile != null) return Ok();
await decStream.CopyToAsync(hashStream, requestAborted).ConfigureAwait(false);
decStream.Close();
SemaphoreSlim fileLock;
lock (_fileUploadLocks)
{
if (!_fileUploadLocks.TryGetValue(hash, out fileLock))
_fileUploadLocks[hash] = fileLock = new SemaphoreSlim(1);
}
var hashString = BitConverter.ToString(hashStream.Finish())
.Replace("-", "", StringComparison.Ordinal).ToUpperInvariant();
if (!string.Equals(hashString, hash, StringComparison.Ordinal))
throw new InvalidOperationException($"Hash does not match file, computed: {hashString}, expected: {hash}");
await fileLock.WaitAsync(requestAborted).ConfigureAwait(false);
compressedSize = countStream.BytesRead;
try
{
var existingFileCheck2 = await _mareDbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash);
if (existingFileCheck2 != null)
// File content is verified -- move it to its final location
System.IO.File.Move(tmpPath, path, true);
}
catch
{
return Ok();
try
{
System.IO.File.Delete(tmpPath);
}
catch { }
throw;
}
// copy the request body to memory
using var rawFileStream = new MemoryStream();
await Request.Body.CopyToAsync(rawFileStream, requestAborted).ConfigureAwait(false);
// reset streams
rawFileStream.Seek(0, SeekOrigin.Begin);
// compute hash to verify
var hashString = BitConverter.ToString(SHA1.HashData(rawFileStream.ToArray()))
.Replace("-", "", StringComparison.Ordinal).ToUpperInvariant();
if (!string.Equals(hashString, hash, StringComparison.Ordinal))
throw new InvalidOperationException($"Hash does not match file, computed: {hashString}, expected: {hash}");
// save file
var path = FilePathUtil.GetFilePath(_basePath, hash);
using var fileStream = new FileStream(path, FileMode.Create);
var lz4 = LZ4Codec.WrapHC(rawFileStream.ToArray(), 0, (int)rawFileStream.Length);
using var compressedStream = new MemoryStream(lz4);
await compressedStream.CopyToAsync(fileStream).ConfigureAwait(false);
// update on db
await _mareDbContext.Files.AddAsync(new FileCache()
{
Hash = hash,
UploadDate = DateTime.UtcNow,
UploaderUID = MareUser,
Size = compressedStream.Length,
Size = compressedSize,
Uploaded = true
}).ConfigureAwait(false);
await _mareDbContext.SaveChangesAsync().ConfigureAwait(false);
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotal, 1);
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotalSize, rawFileStream.Length);
_metricsClient.IncGauge(MetricsAPI.GaugeFilesTotalSize, compressedSize);
_fileUploadLocks.TryRemove(hash, out _);