rebuild filescanner to use threads instead of tasks for the periodic scan, change alternative file upload to be a munged file upload

This commit is contained in:
rootdarkarchon
2023-09-19 23:56:14 +02:00
parent 0008428cb0
commit afb7800d20
6 changed files with 115 additions and 81 deletions

Submodule MareAPI updated: 126b54e552...820a432ad9

View File

@@ -275,7 +275,10 @@ public sealed class FileCacheManager : IDisposable
_fileCaches[fileCache.Hash] = entries = new();
}
entries.Add(fileCache);
if (!entries.Exists(u => string.Equals(u.PrefixedFilePath, fileCache.PrefixedFilePath, StringComparison.OrdinalIgnoreCase)))
{
entries.Add(fileCache);
}
}
private FileCacheEntity? CreateFileCacheEntity(FileInfo fileInfo, string prefixedPath, string? hash = null)

View File

@@ -38,6 +38,7 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase
}
public long CurrentFileProgress => _currentFileProgress;
public long TotalFilesStorage { get; private set; }
public long FileCacheSize { get; set; }
public ConcurrentDictionary<string, int> HaltScanLocks { get; set; } = new(StringComparer.Ordinal);
public bool IsScanRunning => CurrentFileProgress > 0 || TotalFiles > 0;
@@ -89,8 +90,17 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase
Logger.LogWarning("Scanner is on framework, waiting for leaving thread before continuing");
await Task.Delay(250, token).ConfigureAwait(false);
}
await _performanceCollector.LogPerformance(this, "PeriodicFileScan",
async () => await PeriodicFileScan(isForcedFromExternal, token).ConfigureAwait(false)).ConfigureAwait(false);
Thread scanThread = new(() => _performanceCollector.LogPerformance(this, "PeriodicFileScan", () => PeriodicFileScan(isForcedFromExternal, token)))
{
Priority = ThreadPriority.Lowest,
IsBackground = true
};
scanThread.Start();
while (scanThread.IsAlive)
{
await Task.Delay(250).ConfigureAwait(false);
}
if (isForcedFromExternal) isForcedFromExternal = false;
TotalFiles = 0;
_currentFileProgress = 0;
@@ -169,7 +179,7 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase
_scanCancellationTokenSource?.Cancel();
}
private async Task PeriodicFileScan(bool noWaiting, CancellationToken ct)
private void PeriodicFileScan(bool noWaiting, CancellationToken ct)
{
TotalFiles = 1;
var penumbraDir = _ipcManager.PenumbraModDirectory;
@@ -195,90 +205,113 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase
Logger.LogDebug("Getting files from {penumbra} and {storage}", penumbraDir, _configService.Current.CacheFolder);
string[] ext = { ".mdl", ".tex", ".mtrl", ".tmb", ".pap", ".avfx", ".atex", ".sklb", ".eid", ".phyb", ".scd", ".skp", ".shpk" };
var scannedFilesTask = Task.Run(() => Directory.EnumerateFiles(penumbraDir!, "*.*", SearchOption.AllDirectories)
.Select(s => s.ToLowerInvariant())
.Where(f => ext.Any(e => f.EndsWith(e, StringComparison.OrdinalIgnoreCase))
&& !f.Contains(@"\bg\", StringComparison.OrdinalIgnoreCase)
&& !f.Contains(@"\bgcommon\", StringComparison.OrdinalIgnoreCase)
&& !f.Contains(@"\ui\", StringComparison.OrdinalIgnoreCase)));
Dictionary<string, string[]> penumbraFiles = new(StringComparer.Ordinal);
foreach (var folder in Directory.EnumerateDirectories(penumbraDir!))
{
penumbraFiles[folder] = Directory.GetFiles(folder, "*.*", SearchOption.AllDirectories)
.AsParallel()
.Where(f => ext.Any(e => f.EndsWith(e, StringComparison.OrdinalIgnoreCase))
&& !f.Contains(@"\bg\", StringComparison.OrdinalIgnoreCase)
&& !f.Contains(@"\bgcommon\", StringComparison.OrdinalIgnoreCase)
&& !f.Contains(@"\ui\", StringComparison.OrdinalIgnoreCase)).ToArray();
Thread.Sleep(50);
if (ct.IsCancellationRequested) return;
}
var scannedCacheTask = Task.Run(() => Directory.EnumerateFiles(_configService.Current.CacheFolder, "*.*", SearchOption.TopDirectoryOnly)
var allCacheFiles = Directory.GetFiles(_configService.Current.CacheFolder, "*.*", SearchOption.TopDirectoryOnly)
.AsParallel()
.Where(f =>
{
var val = f.Split('\\').Last();
return val.Length == 40 || (val.Split('.').FirstOrDefault()?.Length ?? 0) == 40;
})
.Select(s => s.ToLowerInvariant()));
});
if (ct.IsCancellationRequested) return;
var scannedFiles = (await scannedFilesTask.ConfigureAwait(true))
.Concat(await scannedCacheTask.ConfigureAwait(true))
var allScannedFiles = (penumbraFiles.SelectMany(k => k.Value))
.Concat(allCacheFiles)
.Distinct(StringComparer.OrdinalIgnoreCase)
.ToDictionary(t => t, t => false, StringComparer.OrdinalIgnoreCase);
TotalFiles = scannedFiles.Count;
.ToDictionary(t => t.ToLowerInvariant(), t => false, StringComparer.OrdinalIgnoreCase);
TotalFiles = allScannedFiles.Count;
Thread.CurrentThread.Priority = previousThreadPriority;
await Task.Delay(TimeSpan.FromSeconds(2), ct).ConfigureAwait(false);
Thread.Sleep(TimeSpan.FromSeconds(2));
if (ct.IsCancellationRequested) return;
// scan files from database
var cpuCount = Math.Clamp((int)(Environment.ProcessorCount / 2.0f), 2, 8);
var threadCount = Math.Clamp((int)(Environment.ProcessorCount / 2.0f), 2, 8);
List<FileCacheEntity> entitiesToRemove = new();
List<FileCacheEntity> entitiesToUpdate = new();
object sync = new();
try
Thread[] workerThreads = new Thread[threadCount];
ConcurrentQueue<FileCacheEntity> fileCaches = new(_fileDbManager.GetAllFileCaches());
TotalFilesStorage = fileCaches.Count;
for (int i = 0; i < threadCount; i++)
{
Parallel.ForEach(_fileDbManager.GetAllFileCaches(), new ParallelOptions()
Logger.LogTrace("Creating Thread {i}", i);
workerThreads[i] = new((tcounter) =>
{
CancellationToken = ct,
MaxDegreeOfParallelism = cpuCount,
}, (cache) =>
{
try
var threadNr = (int)tcounter!;
Logger.LogTrace("Spawning Worker Thread {i}", threadNr);
while (!ct.IsCancellationRequested && fileCaches.TryDequeue(out var workload))
{
if (ct.IsCancellationRequested) return;
try
{
if (ct.IsCancellationRequested) return;
if (!_ipcManager.CheckPenumbraApi())
{
Logger.LogWarning("Penumbra not available");
return;
}
if (!_ipcManager.CheckPenumbraApi())
{
Logger.LogWarning("Penumbra not available");
return;
}
var validatedCacheResult = _fileDbManager.ValidateFileCacheEntity(cache);
if (validatedCacheResult.State != FileState.RequireDeletion)
scannedFiles[validatedCacheResult.FileCache.ResolvedFilepath] = true;
if (validatedCacheResult.State == FileState.RequireUpdate)
{
Logger.LogTrace("To update: {path}", validatedCacheResult.FileCache.ResolvedFilepath);
lock (sync) { entitiesToUpdate.Add(validatedCacheResult.FileCache); }
var validatedCacheResult = _fileDbManager.ValidateFileCacheEntity(workload);
if (validatedCacheResult.State != FileState.RequireDeletion)
{
lock (sync) { allScannedFiles[validatedCacheResult.FileCache.ResolvedFilepath] = true; }
}
if (validatedCacheResult.State == FileState.RequireUpdate)
{
Logger.LogTrace("To update: {path}", validatedCacheResult.FileCache.ResolvedFilepath);
lock (sync) { entitiesToUpdate.Add(validatedCacheResult.FileCache); }
}
else if (validatedCacheResult.State == FileState.RequireDeletion)
{
Logger.LogTrace("To delete: {path}", validatedCacheResult.FileCache.ResolvedFilepath);
lock (sync) { entitiesToRemove.Add(validatedCacheResult.FileCache); }
}
}
else if (validatedCacheResult.State == FileState.RequireDeletion)
catch (Exception ex)
{
Logger.LogTrace("To delete: {path}", validatedCacheResult.FileCache.ResolvedFilepath);
lock (sync) { entitiesToRemove.Add(validatedCacheResult.FileCache); }
Logger.LogWarning(ex, "Failed validating {path}", workload.ResolvedFilepath);
}
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Failed validating {path}", cache.ResolvedFilepath);
Interlocked.Increment(ref _currentFileProgress);
if (!noWaiting) Thread.Sleep(5);
}
Interlocked.Increment(ref _currentFileProgress);
if (!noWaiting) Thread.Sleep(5);
});
Logger.LogTrace("Ending Worker Thread {i}", threadNr);
})
{
Priority = ThreadPriority.Lowest,
IsBackground = true
};
workerThreads[i].Start(i);
}
}
catch (OperationCanceledException)
while (!ct.IsCancellationRequested && workerThreads.Any(u => u.IsAlive))
{
return;
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Error during enumerating FileCaches");
Thread.Sleep(1000);
}
if (ct.IsCancellationRequested) return;
Logger.LogTrace("Threads exited");
if (!_ipcManager.CheckPenumbraApi())
{
@@ -312,12 +345,12 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase
if (ct.IsCancellationRequested) return;
// scan new files
if (scannedFiles.Any(c => !c.Value))
if (allScannedFiles.Any(c => !c.Value))
{
Parallel.ForEach(scannedFiles.Where(c => !c.Value).Select(c => c.Key),
Parallel.ForEach(allScannedFiles.Where(c => !c.Value).Select(c => c.Key),
new ParallelOptions()
{
MaxDegreeOfParallelism = cpuCount,
MaxDegreeOfParallelism = threadCount,
CancellationToken = ct
}, (cachePath) =>
{
@@ -343,14 +376,14 @@ public sealed class PeriodicFileScanner : DisposableMediatorSubscriberBase
if (!noWaiting) Thread.Sleep(5);
});
Logger.LogTrace("Scanner added {notScanned} new files to db", scannedFiles.Count(c => !c.Value));
Logger.LogTrace("Scanner added {notScanned} new files to db", allScannedFiles.Count(c => !c.Value));
}
Logger.LogDebug("Scan complete");
TotalFiles = 0;
_currentFileProgress = 0;
entitiesToRemove.Clear();
scannedFiles.Clear();
allScannedFiles.Clear();
if (!_configService.Current.InitialScanComplete)
{

View File

@@ -578,7 +578,10 @@ public partial class UiSharedService : DisposableMediatorSubscriberBase
ImGui.SameLine();
ImGui.Text(_cacheScanner.TotalFiles == 1
? "Collecting files"
: $"Processing {_cacheScanner.CurrentFileProgress} / {_cacheScanner.TotalFiles} files");
: $"Processing {_cacheScanner.CurrentFileProgress}/{_cacheScanner.TotalFilesStorage} from storage ({_cacheScanner.TotalFiles} scanned in)");
AttachToolTip("Note: it is possible to have more files in storage than scanned in, " +
"this is due to the scanner normally ignoring those files but the game loading them in and using them on your character, so they get " +
"added to the local storage.");
}
else if (_configService.Current.FileScanPaused)
{

View File

@@ -66,7 +66,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
base.Dispose(disposing);
}
private static void MungeBuffer(Span<byte> buffer)
public static void MungeBuffer(Span<byte> buffer)
{
for (int i = 0; i < buffer.Length; ++i)
{

View File

@@ -145,10 +145,7 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase
try
{
if (!_mareConfigService.Current.UseAlternativeFileUpload)
await UploadFileStream(compressedFile, fileHash, uploadToken).ConfigureAwait(false);
else
await UploadFileFull(compressedFile, fileHash, uploadToken).ConfigureAwait(false);
await UploadFileStream(compressedFile, fileHash, _mareConfigService.Current.UseAlternativeFileUpload, uploadToken).ConfigureAwait(false);
_verifiedUploadedHashes[fileHash] = DateTime.UtcNow;
}
catch (Exception ex)
@@ -156,7 +153,7 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase
if (!_mareConfigService.Current.UseAlternativeFileUpload && ex is not OperationCanceledException)
{
Logger.LogWarning(ex, "[{hash}] Error during file upload, trying alternative file upload", fileHash);
await UploadFileFull(compressedFile, fileHash, uploadToken).ConfigureAwait(false);
await UploadFileStream(compressedFile, fileHash, true, uploadToken).ConfigureAwait(false);
}
else
{
@@ -165,18 +162,13 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase
}
}
private async Task UploadFileFull(byte[] compressedFile, string fileHash, CancellationToken uploadToken)
private async Task UploadFileStream(byte[] compressedFile, string fileHash, bool munged, CancellationToken uploadToken)
{
using var content = new ByteArrayContent(compressedFile);
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
if (munged)
{
FileDownloadManager.MungeBuffer(compressedFile.AsSpan());
}
var response = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), content, uploadToken).ConfigureAwait(false);
Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode);
CurrentUploads.Single(f => string.Equals(f.Hash, fileHash, StringComparison.Ordinal)).Transferred = compressedFile.Length;
}
private async Task UploadFileStream(byte[] compressedFile, string fileHash, CancellationToken uploadToken)
{
using var ms = new MemoryStream(compressedFile);
Progress<UploadProgress> prog = new((prog) =>
@@ -192,8 +184,11 @@ public sealed class FileUploadManager : DisposableMediatorSubscriberBase
});
var streamContent = new ProgressableStreamContent(ms, prog);
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
var response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false);
HttpResponseMessage response;
if (!munged)
response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false);
else
response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadMunged(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false);
Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode);
}