diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs index 833d4a6..031ab67 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Controllers/RequestController.cs @@ -9,11 +9,13 @@ public class RequestController : ControllerBase { private readonly CachedFileProvider _cachedFileProvider; private readonly RequestQueueService _requestQueue; + private readonly FilePreFetchService _preFetchService; - public RequestController(ILogger logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue) : base(logger) + public RequestController(ILogger logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue, FilePreFetchService preFetchService) : base(logger) { _cachedFileProvider = cachedFileProvider; _requestQueue = requestQueue; + _preFetchService = preFetchService; } [HttpGet] @@ -34,14 +36,21 @@ public class RequestController : ControllerBase { try { - foreach (var file in files) + var hashList = files.ToList(); + var fileList = new List(); + + foreach (var file in hashList) { _logger.LogDebug("Prerequested file: " + file); - await _cachedFileProvider.DownloadFileWhenRequired(file).ConfigureAwait(false); + var fileInfo = await _cachedFileProvider.DownloadFileWhenRequired(file).ConfigureAwait(false); + if (fileInfo != null) + fileList.Add(fileInfo); } + _preFetchService.PrefetchFiles(fileList); + Guid g = Guid.NewGuid(); - await _requestQueue.EnqueueUser(new(g, MareUser, files.ToList()), IsPriority, HttpContext.RequestAborted); + await _requestQueue.EnqueueUser(new(g, MareUser, hashList), IsPriority, HttpContext.RequestAborted); return Ok(g); } diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/CachedFileProvider.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/CachedFileProvider.cs index 1c8d897..2a8536b 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/CachedFileProvider.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/CachedFileProvider.cs @@ -123,35 +123,42 @@ public sealed class CachedFileProvider : IDisposable File.Copy(coldStorageFilePath, tempFileName, true); File.Move(tempFileName, destinationFilePath, true); var destinationFile = new FileInfo(destinationFilePath); - destinationFile.LastAccessTimeUtc = DateTime.UtcNow; - destinationFile.CreationTimeUtc = DateTime.UtcNow; - destinationFile.LastWriteTimeUtc = DateTime.UtcNow; _metrics.IncGauge(MetricsAPI.GaugeFilesTotal); _metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, new FileInfo(destinationFilePath).Length); return true; } catch (Exception ex) { + // Recover from a fairly common race condition -- max wait time is 75ms + // Having TryCopyFromColdStorage protected by the downloadtask mutex doesn't work for some reason? + for (int retry = 0; retry < 5; ++retry) + { + Thread.Sleep(5 + retry * 5); + if (File.Exists(destinationFilePath)) + return true; + } _logger.LogWarning(ex, "Could not copy {coldStoragePath} from cold storage", coldStorageFilePath); } return false; } - public async Task DownloadFileWhenRequired(string hash) + // Returns FileInfo ONLY if the hot file was immediately available without downloading + // Since the intended use is for pre-fetching files from hot storage, this is exactly what we need anyway + public async Task DownloadFileWhenRequired(string hash) { var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash); if (fi != null && fi.Length != 0) - return; + return fi; // first check cold storage if (TryCopyFromColdStorage(hash, FilePathUtil.GetFilePath(_hotStoragePath, hash))) - return; + return null; // no distribution server configured to download from if (_remoteCacheSourceUri == null) - return; + return null; await _downloadSemaphore.WaitAsync().ConfigureAwait(false); if (!_currentTransfers.TryGetValue(hash, out var downloadTask) || (downloadTask?.IsCompleted ?? true)) @@ -176,13 +183,15 @@ public sealed class CachedFileProvider : IDisposable }); } _downloadSemaphore.Release(); + + return null; } public async Task GetAndDownloadFile(string hash) { - await DownloadFileWhenRequired(hash).ConfigureAwait(false); + var fi = await DownloadFileWhenRequired(hash).ConfigureAwait(false); - if (_currentTransfers.TryGetValue(hash, out var downloadTask)) + if (fi == null && _currentTransfers.TryGetValue(hash, out var downloadTask)) { try { @@ -202,7 +211,8 @@ public sealed class CachedFileProvider : IDisposable } } - var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash); + fi ??= FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash); + if (fi == null) return null; diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/ColdTouchHashService.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/ColdTouchHashService.cs index b9c3d00..d9118b6 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/ColdTouchHashService.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/ColdTouchHashService.cs @@ -64,16 +64,17 @@ public class ColdTouchHashService : ITouchHashService // Ignore multiple updates within a time window of the first if (_lastUpdateTimesUtc.TryGetValue(hash, out var lastUpdateTimeUtc) && (nowUtc - lastUpdateTimeUtc).TotalSeconds < _debounceTimeSecs) - { - _logger.LogDebug($"Debounced touch for {hash}"); return; - } var fileInfo = FilePathUtil.GetFileInfoForHash(_coldStoragePath, hash); if (fileInfo != null) { - _logger.LogDebug($"Touching {fileInfo.Name}"); - fileInfo.LastAccessTimeUtc = nowUtc; + _logger.LogTrace("Touching {fileName}", fileInfo.Name); + try + { + fileInfo.LastAccessTimeUtc = nowUtc; + } + catch (IOException) { return; } _lastUpdateTimesUtc.TryAdd(hash, nowUtc); } } diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/FilePreFetchService.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/FilePreFetchService.cs new file mode 100644 index 0000000..1dc8a6e --- /dev/null +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/FilePreFetchService.cs @@ -0,0 +1,125 @@ +using Microsoft.Win32.SafeHandles; +using System.Runtime.InteropServices; +using System.Threading.Channels; + +namespace MareSynchronosStaticFilesServer.Services; + +// Pre-fetch files from cache storage in to memory +public class FilePreFetchService : IHostedService +{ + private struct PreFetchRequest + { + public FileInfo FileInfo; + public DateTime ExpiryUtc; + } + + private readonly ILogger _logger; + + private CancellationTokenSource _prefetchCts; + private readonly Channel _prefetchChannel; + + private const int _readAheadBytes = 8 * 1024 * 1024; // Maximum number of of bytes to prefetch per file (8MB) + private const int _preFetchTasks = 4; // Maximum number of tasks to process prefetches concurrently + + // Use readahead() on linux if its available + [DllImport("libc", EntryPoint = "readahead")] + static extern int LinuxReadAheadExternal(SafeFileHandle fd, Int64 offset, int count); + + private bool _hasLinuxReadAhead = true; + + public FilePreFetchService(ILogger logger) + { + _logger = logger; + _prefetchChannel = Channel.CreateUnbounded(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _logger.LogInformation("File PreFetch Service started"); + _prefetchCts = new(); + for (int i = 0; i < _preFetchTasks; ++i) + _ = PrefetchTask(_prefetchCts.Token); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _prefetchCts.Cancel(); + return Task.CompletedTask; + } + + // Queue a list of hashes to be prefetched in a background task + public void PrefetchFiles(ICollection fileList) + { + if (!_hasLinuxReadAhead) + { + if (!_prefetchCts.IsCancellationRequested) + { + _logger.LogError("readahead() is not available - aborting File PreFetch Service"); + _prefetchCts.Cancel(); + } + return; + } + + var nowUtc = DateTime.UtcNow; + + // Expire prefetch requests that aren't picked up within 500ms + // By this point the request is probably already being served, or things are moving too slow to matter anyway + var expiry = nowUtc + TimeSpan.FromMilliseconds(500); + + foreach (var fileInfo in fileList) + { + _ = _prefetchChannel.Writer.TryWrite(new PreFetchRequest(){ + FileInfo = fileInfo, + ExpiryUtc = expiry, + }); + } + } + + private async Task PrefetchTask(CancellationToken ct) + { + var reader = _prefetchChannel.Reader; + + while (!ct.IsCancellationRequested) + { + try + { + var req = await reader.ReadAsync(ct).ConfigureAwait(false); + var nowUtc = DateTime.UtcNow; + + if (nowUtc >= req.ExpiryUtc) + { + _logger.LogDebug("Skipped expired prefetch for {hash}", req.FileInfo.Name); + continue; + } + + try + { + var fs = new FileStream(req.FileInfo.FullName, FileMode.Open, FileAccess.Read, FileShare.Inheritable | FileShare.Read); + + await using (fs.ConfigureAwait(false)) + { + try + { + _ = LinuxReadAheadExternal(fs.SafeFileHandle, 0, _readAheadBytes); + _logger.LogTrace("Prefetched {hash}", req.FileInfo.Name); + } + catch (EntryPointNotFoundException) + { + _hasLinuxReadAhead = false; + } + } + } + catch (IOException) { } + } + catch (OperationCanceledException) + { + continue; + } + catch (Exception e) + { + _logger.LogError(e, "Error during prefetch task"); + } + } + } +} diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/ShardTouchMessageService.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/ShardTouchMessageService.cs index 2a88b36..78a59df 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/ShardTouchMessageService.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Services/ShardTouchMessageService.cs @@ -100,7 +100,7 @@ public class ShardTouchMessageService : ITouchHashService _touchHashSet.Clear(); } if (hashes.Count > 0) - await SendTouches(hashes); + await SendTouches(hashes).ConfigureAwait(false); await Task.Delay(TimeSpan.FromSeconds(60), ct).ConfigureAwait(false); } catch (Exception e) @@ -115,7 +115,7 @@ public class ShardTouchMessageService : ITouchHashService _touchHashSet.Clear(); } if (hashes.Count > 0) - await SendTouches(hashes); + await SendTouches(hashes).ConfigureAwait(false); } public void TouchColdHash(string hash) diff --git a/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs b/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs index 5520bdf..a49e525 100644 --- a/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosStaticFilesServer/Startup.cs @@ -90,6 +90,8 @@ public class Startup services.AddSingleton(); services.AddSingleton(); services.AddHostedService(p => p.GetService()); + services.AddSingleton(); + services.AddHostedService(p => p.GetService()); services.AddHostedService(m => m.GetService()); services.AddSingleton, MareConfigurationServiceClient>(); services.AddHostedService(p => (MareConfigurationServiceClient)p.GetService>());