Pre-fetch cache files upon pre-request

This commit is contained in:
Loporrit
2024-09-29 13:38:40 +00:00
parent 61821c0775
commit c400d9746f
6 changed files with 168 additions and 21 deletions

View File

@@ -9,11 +9,13 @@ public class RequestController : ControllerBase
{
private readonly CachedFileProvider _cachedFileProvider;
private readonly RequestQueueService _requestQueue;
private readonly FilePreFetchService _preFetchService;
public RequestController(ILogger<RequestController> logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue) : base(logger)
public RequestController(ILogger<RequestController> logger, CachedFileProvider cachedFileProvider, RequestQueueService requestQueue, FilePreFetchService preFetchService) : base(logger)
{
_cachedFileProvider = cachedFileProvider;
_requestQueue = requestQueue;
_preFetchService = preFetchService;
}
[HttpGet]
@@ -34,14 +36,21 @@ public class RequestController : ControllerBase
{
try
{
foreach (var file in files)
var hashList = files.ToList();
var fileList = new List<FileInfo>();
foreach (var file in hashList)
{
_logger.LogDebug("Prerequested file: " + file);
await _cachedFileProvider.DownloadFileWhenRequired(file).ConfigureAwait(false);
var fileInfo = await _cachedFileProvider.DownloadFileWhenRequired(file).ConfigureAwait(false);
if (fileInfo != null)
fileList.Add(fileInfo);
}
_preFetchService.PrefetchFiles(fileList);
Guid g = Guid.NewGuid();
await _requestQueue.EnqueueUser(new(g, MareUser, files.ToList()), IsPriority, HttpContext.RequestAborted);
await _requestQueue.EnqueueUser(new(g, MareUser, hashList), IsPriority, HttpContext.RequestAborted);
return Ok(g);
}

View File

@@ -123,35 +123,42 @@ public sealed class CachedFileProvider : IDisposable
File.Copy(coldStorageFilePath, tempFileName, true);
File.Move(tempFileName, destinationFilePath, true);
var destinationFile = new FileInfo(destinationFilePath);
destinationFile.LastAccessTimeUtc = DateTime.UtcNow;
destinationFile.CreationTimeUtc = DateTime.UtcNow;
destinationFile.LastWriteTimeUtc = DateTime.UtcNow;
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, new FileInfo(destinationFilePath).Length);
return true;
}
catch (Exception ex)
{
// Recover from a fairly common race condition -- max wait time is 75ms
// Having TryCopyFromColdStorage protected by the downloadtask mutex doesn't work for some reason?
for (int retry = 0; retry < 5; ++retry)
{
Thread.Sleep(5 + retry * 5);
if (File.Exists(destinationFilePath))
return true;
}
_logger.LogWarning(ex, "Could not copy {coldStoragePath} from cold storage", coldStorageFilePath);
}
return false;
}
public async Task DownloadFileWhenRequired(string hash)
// Returns FileInfo ONLY if the hot file was immediately available without downloading
// Since the intended use is for pre-fetching files from hot storage, this is exactly what we need anyway
public async Task<FileInfo?> DownloadFileWhenRequired(string hash)
{
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
if (fi != null && fi.Length != 0)
return;
return fi;
// first check cold storage
if (TryCopyFromColdStorage(hash, FilePathUtil.GetFilePath(_hotStoragePath, hash)))
return;
return null;
// no distribution server configured to download from
if (_remoteCacheSourceUri == null)
return;
return null;
await _downloadSemaphore.WaitAsync().ConfigureAwait(false);
if (!_currentTransfers.TryGetValue(hash, out var downloadTask) || (downloadTask?.IsCompleted ?? true))
@@ -176,13 +183,15 @@ public sealed class CachedFileProvider : IDisposable
});
}
_downloadSemaphore.Release();
return null;
}
public async Task<FileInfo?> GetAndDownloadFile(string hash)
{
await DownloadFileWhenRequired(hash).ConfigureAwait(false);
var fi = await DownloadFileWhenRequired(hash).ConfigureAwait(false);
if (_currentTransfers.TryGetValue(hash, out var downloadTask))
if (fi == null && _currentTransfers.TryGetValue(hash, out var downloadTask))
{
try
{
@@ -202,7 +211,8 @@ public sealed class CachedFileProvider : IDisposable
}
}
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
fi ??= FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
if (fi == null)
return null;

View File

@@ -64,16 +64,17 @@ public class ColdTouchHashService : ITouchHashService
// Ignore multiple updates within a time window of the first
if (_lastUpdateTimesUtc.TryGetValue(hash, out var lastUpdateTimeUtc) && (nowUtc - lastUpdateTimeUtc).TotalSeconds < _debounceTimeSecs)
{
_logger.LogDebug($"Debounced touch for {hash}");
return;
}
var fileInfo = FilePathUtil.GetFileInfoForHash(_coldStoragePath, hash);
if (fileInfo != null)
{
_logger.LogDebug($"Touching {fileInfo.Name}");
fileInfo.LastAccessTimeUtc = nowUtc;
_logger.LogTrace("Touching {fileName}", fileInfo.Name);
try
{
fileInfo.LastAccessTimeUtc = nowUtc;
}
catch (IOException) { return; }
_lastUpdateTimesUtc.TryAdd(hash, nowUtc);
}
}

View File

@@ -0,0 +1,125 @@
using Microsoft.Win32.SafeHandles;
using System.Runtime.InteropServices;
using System.Threading.Channels;
namespace MareSynchronosStaticFilesServer.Services;
// Pre-fetch files from cache storage in to memory
public class FilePreFetchService : IHostedService
{
private struct PreFetchRequest
{
public FileInfo FileInfo;
public DateTime ExpiryUtc;
}
private readonly ILogger<FilePreFetchService> _logger;
private CancellationTokenSource _prefetchCts;
private readonly Channel<PreFetchRequest> _prefetchChannel;
private const int _readAheadBytes = 8 * 1024 * 1024; // Maximum number of of bytes to prefetch per file (8MB)
private const int _preFetchTasks = 4; // Maximum number of tasks to process prefetches concurrently
// Use readahead() on linux if its available
[DllImport("libc", EntryPoint = "readahead")]
static extern int LinuxReadAheadExternal(SafeFileHandle fd, Int64 offset, int count);
private bool _hasLinuxReadAhead = true;
public FilePreFetchService(ILogger<FilePreFetchService> logger)
{
_logger = logger;
_prefetchChannel = Channel.CreateUnbounded<PreFetchRequest>();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("File PreFetch Service started");
_prefetchCts = new();
for (int i = 0; i < _preFetchTasks; ++i)
_ = PrefetchTask(_prefetchCts.Token);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_prefetchCts.Cancel();
return Task.CompletedTask;
}
// Queue a list of hashes to be prefetched in a background task
public void PrefetchFiles(ICollection<FileInfo> fileList)
{
if (!_hasLinuxReadAhead)
{
if (!_prefetchCts.IsCancellationRequested)
{
_logger.LogError("readahead() is not available - aborting File PreFetch Service");
_prefetchCts.Cancel();
}
return;
}
var nowUtc = DateTime.UtcNow;
// Expire prefetch requests that aren't picked up within 500ms
// By this point the request is probably already being served, or things are moving too slow to matter anyway
var expiry = nowUtc + TimeSpan.FromMilliseconds(500);
foreach (var fileInfo in fileList)
{
_ = _prefetchChannel.Writer.TryWrite(new PreFetchRequest(){
FileInfo = fileInfo,
ExpiryUtc = expiry,
});
}
}
private async Task PrefetchTask(CancellationToken ct)
{
var reader = _prefetchChannel.Reader;
while (!ct.IsCancellationRequested)
{
try
{
var req = await reader.ReadAsync(ct).ConfigureAwait(false);
var nowUtc = DateTime.UtcNow;
if (nowUtc >= req.ExpiryUtc)
{
_logger.LogDebug("Skipped expired prefetch for {hash}", req.FileInfo.Name);
continue;
}
try
{
var fs = new FileStream(req.FileInfo.FullName, FileMode.Open, FileAccess.Read, FileShare.Inheritable | FileShare.Read);
await using (fs.ConfigureAwait(false))
{
try
{
_ = LinuxReadAheadExternal(fs.SafeFileHandle, 0, _readAheadBytes);
_logger.LogTrace("Prefetched {hash}", req.FileInfo.Name);
}
catch (EntryPointNotFoundException)
{
_hasLinuxReadAhead = false;
}
}
}
catch (IOException) { }
}
catch (OperationCanceledException)
{
continue;
}
catch (Exception e)
{
_logger.LogError(e, "Error during prefetch task");
}
}
}
}

View File

@@ -100,7 +100,7 @@ public class ShardTouchMessageService : ITouchHashService
_touchHashSet.Clear();
}
if (hashes.Count > 0)
await SendTouches(hashes);
await SendTouches(hashes).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(60), ct).ConfigureAwait(false);
}
catch (Exception e)
@@ -115,7 +115,7 @@ public class ShardTouchMessageService : ITouchHashService
_touchHashSet.Clear();
}
if (hashes.Count > 0)
await SendTouches(hashes);
await SendTouches(hashes).ConfigureAwait(false);
}
public void TouchColdHash(string hash)

View File

@@ -90,6 +90,8 @@ public class Startup
services.AddSingleton<ServerTokenGenerator>();
services.AddSingleton<RequestQueueService>();
services.AddHostedService(p => p.GetService<RequestQueueService>());
services.AddSingleton<FilePreFetchService>();
services.AddHostedService(p => p.GetService<FilePreFetchService>());
services.AddHostedService(m => m.GetService<FileStatisticsService>());
services.AddSingleton<IConfigurationService<MareConfigurationAuthBase>, MareConfigurationServiceClient<MareConfigurationAuthBase>>();
services.AddHostedService(p => (MareConfigurationServiceClient<MareConfigurationAuthBase>)p.GetService<IConfigurationService<MareConfigurationAuthBase>>());