diff --git a/MareAPI b/MareAPI index 126b54e..820a432 160000 --- a/MareAPI +++ b/MareAPI @@ -1 +1 @@ -Subproject commit 126b54e552e53dd6615afe0f2725040c322e0387 +Subproject commit 820a432ad9ac6c19f9908ec0f51671550e0fa151 diff --git a/MareSynchronos/FileCache/FileCacheManager.cs b/MareSynchronos/FileCache/FileCacheManager.cs index e1cad19..5986f86 100644 --- a/MareSynchronos/FileCache/FileCacheManager.cs +++ b/MareSynchronos/FileCache/FileCacheManager.cs @@ -275,7 +275,10 @@ public sealed class FileCacheManager : IDisposable _fileCaches[fileCache.Hash] = entries = new(); } - entries.Add(fileCache); + if (!entries.Exists(u => string.Equals(u.PrefixedFilePath, fileCache.PrefixedFilePath, StringComparison.OrdinalIgnoreCase))) + { + entries.Add(fileCache); + } } private FileCacheEntity? CreateFileCacheEntity(FileInfo fileInfo, string prefixedPath, string? hash = null) diff --git a/MareSynchronos/FileCache/PeriodicFileScanner.cs b/MareSynchronos/FileCache/PeriodicFileScanner.cs index 5145b53..e0c33ec 100644 --- a/MareSynchronos/FileCache/PeriodicFileScanner.cs +++ b/MareSynchronos/FileCache/PeriodicFileScanner.cs @@ -38,6 +38,7 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase } public long CurrentFileProgress => _currentFileProgress; + public long TotalFilesStorage { get; private set; } public long FileCacheSize { get; set; } public ConcurrentDictionary HaltScanLocks { get; set; } = new(StringComparer.Ordinal); public bool IsScanRunning => CurrentFileProgress > 0 || TotalFiles > 0; @@ -89,8 +90,17 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase Logger.LogWarning("Scanner is on framework, waiting for leaving thread before continuing"); await Task.Delay(250, token).ConfigureAwait(false); } - await _performanceCollector.LogPerformance(this, "PeriodicFileScan", - async () => await PeriodicFileScan(isForcedFromExternal, token).ConfigureAwait(false)).ConfigureAwait(false); + + Thread scanThread = new(() => _performanceCollector.LogPerformance(this, "PeriodicFileScan", () => PeriodicFileScan(isForcedFromExternal, token))) + { + Priority = ThreadPriority.Lowest, + IsBackground = true + }; + scanThread.Start(); + while (scanThread.IsAlive) + { + await Task.Delay(250).ConfigureAwait(false); + } if (isForcedFromExternal) isForcedFromExternal = false; TotalFiles = 0; _currentFileProgress = 0; @@ -169,7 +179,7 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase _scanCancellationTokenSource?.Cancel(); } - private async Task PeriodicFileScan(bool noWaiting, CancellationToken ct) + private void PeriodicFileScan(bool noWaiting, CancellationToken ct) { TotalFiles = 1; var penumbraDir = _ipcManager.PenumbraModDirectory; @@ -195,90 +205,113 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase Logger.LogDebug("Getting files from {penumbra} and {storage}", penumbraDir, _configService.Current.CacheFolder); string[] ext = { ".mdl", ".tex", ".mtrl", ".tmb", ".pap", ".avfx", ".atex", ".sklb", ".eid", ".phyb", ".scd", ".skp", ".shpk" }; - var scannedFilesTask = Task.Run(() => Directory.EnumerateFiles(penumbraDir!, "*.*", SearchOption.AllDirectories) - .Select(s => s.ToLowerInvariant()) - .Where(f => ext.Any(e => f.EndsWith(e, StringComparison.OrdinalIgnoreCase)) - && !f.Contains(@"\bg\", StringComparison.OrdinalIgnoreCase) - && !f.Contains(@"\bgcommon\", StringComparison.OrdinalIgnoreCase) - && !f.Contains(@"\ui\", StringComparison.OrdinalIgnoreCase))); + Dictionary penumbraFiles = new(StringComparer.Ordinal); + foreach (var folder in Directory.EnumerateDirectories(penumbraDir!)) + { + penumbraFiles[folder] = Directory.GetFiles(folder, "*.*", SearchOption.AllDirectories) + .AsParallel() + .Where(f => ext.Any(e => f.EndsWith(e, StringComparison.OrdinalIgnoreCase)) + && !f.Contains(@"\bg\", StringComparison.OrdinalIgnoreCase) + && !f.Contains(@"\bgcommon\", StringComparison.OrdinalIgnoreCase) + && !f.Contains(@"\ui\", StringComparison.OrdinalIgnoreCase)).ToArray(); + Thread.Sleep(50); + if (ct.IsCancellationRequested) return; + } - var scannedCacheTask = Task.Run(() => Directory.EnumerateFiles(_configService.Current.CacheFolder, "*.*", SearchOption.TopDirectoryOnly) + var allCacheFiles = Directory.GetFiles(_configService.Current.CacheFolder, "*.*", SearchOption.TopDirectoryOnly) + .AsParallel() .Where(f => { var val = f.Split('\\').Last(); return val.Length == 40 || (val.Split('.').FirstOrDefault()?.Length ?? 0) == 40; - }) - .Select(s => s.ToLowerInvariant())); + }); + if (ct.IsCancellationRequested) return; - var scannedFiles = (await scannedFilesTask.ConfigureAwait(true)) - .Concat(await scannedCacheTask.ConfigureAwait(true)) + var allScannedFiles = (penumbraFiles.SelectMany(k => k.Value)) + .Concat(allCacheFiles) .Distinct(StringComparer.OrdinalIgnoreCase) - .ToDictionary(t => t, t => false, StringComparer.OrdinalIgnoreCase); - TotalFiles = scannedFiles.Count; + .ToDictionary(t => t.ToLowerInvariant(), t => false, StringComparer.OrdinalIgnoreCase); + + TotalFiles = allScannedFiles.Count; Thread.CurrentThread.Priority = previousThreadPriority; - await Task.Delay(TimeSpan.FromSeconds(2), ct).ConfigureAwait(false); + Thread.Sleep(TimeSpan.FromSeconds(2)); + + if (ct.IsCancellationRequested) return; // scan files from database - var cpuCount = Math.Clamp((int)(Environment.ProcessorCount / 2.0f), 2, 8); + var threadCount = Math.Clamp((int)(Environment.ProcessorCount / 2.0f), 2, 8); List entitiesToRemove = new(); List entitiesToUpdate = new(); object sync = new(); - try + Thread[] workerThreads = new Thread[threadCount]; + + ConcurrentQueue fileCaches = new(_fileDbManager.GetAllFileCaches()); + + TotalFilesStorage = fileCaches.Count; + + for (int i = 0; i < threadCount; i++) { - Parallel.ForEach(_fileDbManager.GetAllFileCaches(), new ParallelOptions() + Logger.LogTrace("Creating Thread {i}", i); + workerThreads[i] = new((tcounter) => { - CancellationToken = ct, - MaxDegreeOfParallelism = cpuCount, - }, (cache) => - { - try + var threadNr = (int)tcounter!; + Logger.LogTrace("Spawning Worker Thread {i}", threadNr); + while (!ct.IsCancellationRequested && fileCaches.TryDequeue(out var workload)) { - if (ct.IsCancellationRequested) return; + try + { + if (ct.IsCancellationRequested) return; - if (!_ipcManager.CheckPenumbraApi()) - { - Logger.LogWarning("Penumbra not available"); - return; - } + if (!_ipcManager.CheckPenumbraApi()) + { + Logger.LogWarning("Penumbra not available"); + return; + } - var validatedCacheResult = _fileDbManager.ValidateFileCacheEntity(cache); - if (validatedCacheResult.State != FileState.RequireDeletion) - scannedFiles[validatedCacheResult.FileCache.ResolvedFilepath] = true; - if (validatedCacheResult.State == FileState.RequireUpdate) - { - Logger.LogTrace("To update: {path}", validatedCacheResult.FileCache.ResolvedFilepath); - lock (sync) { entitiesToUpdate.Add(validatedCacheResult.FileCache); } + var validatedCacheResult = _fileDbManager.ValidateFileCacheEntity(workload); + if (validatedCacheResult.State != FileState.RequireDeletion) + { + lock (sync) { allScannedFiles[validatedCacheResult.FileCache.ResolvedFilepath] = true; } + } + if (validatedCacheResult.State == FileState.RequireUpdate) + { + Logger.LogTrace("To update: {path}", validatedCacheResult.FileCache.ResolvedFilepath); + lock (sync) { entitiesToUpdate.Add(validatedCacheResult.FileCache); } + } + else if (validatedCacheResult.State == FileState.RequireDeletion) + { + Logger.LogTrace("To delete: {path}", validatedCacheResult.FileCache.ResolvedFilepath); + lock (sync) { entitiesToRemove.Add(validatedCacheResult.FileCache); } + } } - else if (validatedCacheResult.State == FileState.RequireDeletion) + catch (Exception ex) { - Logger.LogTrace("To delete: {path}", validatedCacheResult.FileCache.ResolvedFilepath); - lock (sync) { entitiesToRemove.Add(validatedCacheResult.FileCache); } + Logger.LogWarning(ex, "Failed validating {path}", workload.ResolvedFilepath); } - } - catch (Exception ex) - { - Logger.LogWarning(ex, "Failed validating {path}", cache.ResolvedFilepath); + Interlocked.Increment(ref _currentFileProgress); + if (!noWaiting) Thread.Sleep(5); } - Interlocked.Increment(ref _currentFileProgress); - if (!noWaiting) Thread.Sleep(5); - }); + Logger.LogTrace("Ending Worker Thread {i}", threadNr); + }) + { + Priority = ThreadPriority.Lowest, + IsBackground = true + }; + workerThreads[i].Start(i); + } - } - catch (OperationCanceledException) + while (!ct.IsCancellationRequested && workerThreads.Any(u => u.IsAlive)) { - return; - } - catch (Exception ex) - { - Logger.LogWarning(ex, "Error during enumerating FileCaches"); + Thread.Sleep(1000); } if (ct.IsCancellationRequested) return; + Logger.LogTrace("Threads exited"); if (!_ipcManager.CheckPenumbraApi()) { @@ -312,12 +345,12 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase if (ct.IsCancellationRequested) return; // scan new files - if (scannedFiles.Any(c => !c.Value)) + if (allScannedFiles.Any(c => !c.Value)) { - Parallel.ForEach(scannedFiles.Where(c => !c.Value).Select(c => c.Key), + Parallel.ForEach(allScannedFiles.Where(c => !c.Value).Select(c => c.Key), new ParallelOptions() { - MaxDegreeOfParallelism = cpuCount, + MaxDegreeOfParallelism = threadCount, CancellationToken = ct }, (cachePath) => { @@ -343,14 +376,14 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase if (!noWaiting) Thread.Sleep(5); }); - Logger.LogTrace("Scanner added {notScanned} new files to db", scannedFiles.Count(c => !c.Value)); + Logger.LogTrace("Scanner added {notScanned} new files to db", allScannedFiles.Count(c => !c.Value)); } Logger.LogDebug("Scan complete"); TotalFiles = 0; _currentFileProgress = 0; entitiesToRemove.Clear(); - scannedFiles.Clear(); + allScannedFiles.Clear(); if (!_configService.Current.InitialScanComplete) { diff --git a/MareSynchronos/UI/UISharedService.cs b/MareSynchronos/UI/UISharedService.cs index fac57ae..7cdaba0 100644 --- a/MareSynchronos/UI/UISharedService.cs +++ b/MareSynchronos/UI/UISharedService.cs @@ -578,7 +578,10 @@ public partial class UiSharedService : DisposableMediatorSubscriberBase ImGui.SameLine(); ImGui.Text(_cacheScanner.TotalFiles == 1 ? "Collecting files" - : $"Processing {_cacheScanner.CurrentFileProgress} / {_cacheScanner.TotalFiles} files"); + : $"Processing {_cacheScanner.CurrentFileProgress}/{_cacheScanner.TotalFilesStorage} from storage ({_cacheScanner.TotalFiles} scanned in)"); + AttachToolTip("Note: it is possible to have more files in storage than scanned in, " + + "this is due to the scanner normally ignoring those files but the game loading them in and using them on your character, so they get " + + "added to the local storage."); } else if (_configService.Current.FileScanPaused) { diff --git a/MareSynchronos/WebAPI/Files/FileDownloadManager.cs b/MareSynchronos/WebAPI/Files/FileDownloadManager.cs index 04fb305..8daa26b 100644 --- a/MareSynchronos/WebAPI/Files/FileDownloadManager.cs +++ b/MareSynchronos/WebAPI/Files/FileDownloadManager.cs @@ -66,7 +66,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase base.Dispose(disposing); } - private static void MungeBuffer(Span buffer) + public static void MungeBuffer(Span buffer) { for (int i = 0; i < buffer.Length; ++i) { diff --git a/MareSynchronos/WebAPI/Files/FileUploadManager.cs b/MareSynchronos/WebAPI/Files/FileUploadManager.cs index 91e2cc4..dfd39c3 100644 --- a/MareSynchronos/WebAPI/Files/FileUploadManager.cs +++ b/MareSynchronos/WebAPI/Files/FileUploadManager.cs @@ -145,10 +145,7 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase try { - if (!_mareConfigService.Current.UseAlternativeFileUpload) - await UploadFileStream(compressedFile, fileHash, uploadToken).ConfigureAwait(false); - else - await UploadFileFull(compressedFile, fileHash, uploadToken).ConfigureAwait(false); + await UploadFileStream(compressedFile, fileHash, _mareConfigService.Current.UseAlternativeFileUpload, uploadToken).ConfigureAwait(false); _verifiedUploadedHashes[fileHash] = DateTime.UtcNow; } catch (Exception ex) @@ -156,7 +153,7 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase if (!_mareConfigService.Current.UseAlternativeFileUpload && ex is not OperationCanceledException) { Logger.LogWarning(ex, "[{hash}] Error during file upload, trying alternative file upload", fileHash); - await UploadFileFull(compressedFile, fileHash, uploadToken).ConfigureAwait(false); + await UploadFileStream(compressedFile, fileHash, true, uploadToken).ConfigureAwait(false); } else { @@ -165,18 +162,13 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase } } - private async Task UploadFileFull(byte[] compressedFile, string fileHash, CancellationToken uploadToken) + private async Task UploadFileStream(byte[] compressedFile, string fileHash, bool munged, CancellationToken uploadToken) { - using var content = new ByteArrayContent(compressedFile); - content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); + if (munged) + { + FileDownloadManager.MungeBuffer(compressedFile.AsSpan()); + } - var response = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), content, uploadToken).ConfigureAwait(false); - Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode); - CurrentUploads.Single(f => string.Equals(f.Hash, fileHash, StringComparison.Ordinal)).Transferred = compressedFile.Length; - } - - private async Task UploadFileStream(byte[] compressedFile, string fileHash, CancellationToken uploadToken) - { using var ms = new MemoryStream(compressedFile); Progress prog = new((prog) => @@ -192,8 +184,11 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase }); var streamContent = new ProgressableStreamContent(ms, prog); streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); - - var response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false); + HttpResponseMessage response; + if (!munged) + response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false); + else + response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadMunged(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false); Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode); }