Remote touch service for fileserver shards
This commit is contained in:
@@ -26,4 +26,21 @@ public class DistributionController : ControllerBase
|
||||
|
||||
return File(fs, "application/octet-stream");
|
||||
}
|
||||
|
||||
[HttpPost("touch")]
|
||||
[Authorize(Policy = "Internal")]
|
||||
public IActionResult TouchFiles([FromBody] string[] files)
|
||||
{
|
||||
_logger.LogInformation($"TouchFiles:{MareUser}:{files.Length}");
|
||||
|
||||
if (files.Length == 0)
|
||||
return Ok();
|
||||
|
||||
Task.Run(() => {
|
||||
foreach (var file in files)
|
||||
_cachedFileProvider.TouchColdHash(file);
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
return Ok();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ public sealed class CachedFileProvider : IDisposable
|
||||
private readonly FileStatisticsService _fileStatisticsService;
|
||||
private readonly MareMetrics _metrics;
|
||||
private readonly ServerTokenGenerator _generator;
|
||||
private readonly ITouchHashService _touchService;
|
||||
private readonly Uri _remoteCacheSourceUri;
|
||||
private readonly bool _useColdStorage;
|
||||
private readonly string _hotStoragePath;
|
||||
@@ -28,7 +29,7 @@ public sealed class CachedFileProvider : IDisposable
|
||||
private bool _isDistributionServer;
|
||||
|
||||
public CachedFileProvider(IConfigurationService<StaticFilesServerConfiguration> configuration, ILogger<CachedFileProvider> logger,
|
||||
FileStatisticsService fileStatisticsService, MareMetrics metrics, ServerTokenGenerator generator)
|
||||
FileStatisticsService fileStatisticsService, MareMetrics metrics, ServerTokenGenerator generator, ITouchHashService touchService)
|
||||
{
|
||||
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
|
||||
_configuration = configuration;
|
||||
@@ -36,6 +37,7 @@ public sealed class CachedFileProvider : IDisposable
|
||||
_fileStatisticsService = fileStatisticsService;
|
||||
_metrics = metrics;
|
||||
_generator = generator;
|
||||
_touchService = touchService;
|
||||
_remoteCacheSourceUri = configuration.GetValueOrDefault<Uri>(nameof(StaticFilesServerConfiguration.DistributionFileServerAddress), null);
|
||||
_isDistributionServer = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.IsDistributionNode), false);
|
||||
_useColdStorage = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
||||
@@ -111,16 +113,15 @@ public sealed class CachedFileProvider : IDisposable
|
||||
|
||||
if (string.IsNullOrEmpty(_coldStoragePath)) return false;
|
||||
|
||||
var coldStorageFilePath = FilePathUtil.GetFileInfoForHash(_coldStoragePath, hash);
|
||||
var coldStorageFilePath = FilePathUtil.GetFilePath(_coldStoragePath, hash);
|
||||
if (coldStorageFilePath == null) return false;
|
||||
|
||||
try
|
||||
{
|
||||
_logger.LogDebug("Copying {hash} from cold storage: {path}", hash, coldStorageFilePath);
|
||||
var tempFileName = destinationFilePath + ".dl";
|
||||
File.Copy(coldStorageFilePath.FullName, tempFileName, true);
|
||||
File.Copy(coldStorageFilePath, tempFileName, true);
|
||||
File.Move(tempFileName, destinationFilePath, true);
|
||||
coldStorageFilePath.LastAccessTimeUtc = DateTime.UtcNow;
|
||||
var destinationFile = new FileInfo(destinationFilePath);
|
||||
destinationFile.LastAccessTimeUtc = DateTime.UtcNow;
|
||||
destinationFile.CreationTimeUtc = DateTime.UtcNow;
|
||||
@@ -180,9 +181,10 @@ public sealed class CachedFileProvider : IDisposable
|
||||
{
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
|
||||
if (fi == null) return null;
|
||||
|
||||
fi.LastAccessTimeUtc = DateTime.UtcNow;
|
||||
|
||||
_touchService.TouchColdHash(hash);
|
||||
|
||||
_fileStatisticsService.LogFile(hash, fi.Length);
|
||||
|
||||
return new FileStream(fi.FullName, FileMode.Open, FileAccess.Read, FileShare.Inheritable | FileShare.Read);
|
||||
@@ -215,6 +217,11 @@ public sealed class CachedFileProvider : IDisposable
|
||||
return GetLocalFileStream(hash);
|
||||
}
|
||||
|
||||
public void TouchColdHash(string hash)
|
||||
{
|
||||
_touchService.TouchColdHash(hash);
|
||||
}
|
||||
|
||||
public bool AnyFilesDownloading(List<string> hashes)
|
||||
{
|
||||
return hashes.Exists(_currentTransfers.Keys.Contains);
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosStaticFilesServer.Utils;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
// Perform access time updates for cold cache files accessed via hot cache or shard servers
|
||||
public class ColdTouchHashService : ITouchHashService
|
||||
{
|
||||
private readonly ILogger<ColdTouchHashService> _logger;
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
|
||||
private readonly bool _useColdStorage;
|
||||
private readonly string _coldStoragePath;
|
||||
|
||||
// Debounce multiple updates towards the same file
|
||||
private readonly Dictionary<string, DateTime> _lastUpdateTimesUtc = new(1009, StringComparer.Ordinal);
|
||||
private int _cleanupCounter = 0;
|
||||
private const double _debounceTimeSecs = 90.0;
|
||||
|
||||
public ColdTouchHashService(ILogger<ColdTouchHashService> logger, IConfigurationService<StaticFilesServerConfiguration> configuration)
|
||||
{
|
||||
_logger = logger;
|
||||
_configuration = configuration;
|
||||
_useColdStorage = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false);
|
||||
_coldStoragePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.ColdStorageDirectory));
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void TouchColdHash(string hash)
|
||||
{
|
||||
if (!_useColdStorage)
|
||||
return;
|
||||
|
||||
var nowUtc = DateTime.UtcNow;
|
||||
|
||||
// Clean up debounce dictionary regularly
|
||||
if (_cleanupCounter++ >= 1000)
|
||||
{
|
||||
foreach (var entry in _lastUpdateTimesUtc.Where(entry => (nowUtc - entry.Value).TotalSeconds >= _debounceTimeSecs).ToList())
|
||||
_lastUpdateTimesUtc.Remove(entry.Key);
|
||||
_cleanupCounter = 0;
|
||||
}
|
||||
|
||||
// Ignore multiple updates within a 90 second window of the first
|
||||
if (_lastUpdateTimesUtc.TryGetValue(hash, out var lastUpdateTimeUtc) && (nowUtc - lastUpdateTimeUtc).TotalSeconds < _debounceTimeSecs)
|
||||
{
|
||||
_logger.LogDebug($"Debounced touch for {hash}");
|
||||
return;
|
||||
}
|
||||
|
||||
var fileInfo = FilePathUtil.GetFileInfoForHash(_coldStoragePath, hash);
|
||||
if (fileInfo != null)
|
||||
{
|
||||
_logger.LogDebug($"Touching {fileInfo.Name}");
|
||||
fileInfo.LastAccessTimeUtc = nowUtc;
|
||||
_lastUpdateTimesUtc.TryAdd(hash, nowUtc);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
public interface ITouchHashService : IHostedService
|
||||
{
|
||||
void TouchColdHash(string hash);
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronosShared.Services;
|
||||
using MareSynchronosShared.Utils;
|
||||
using System.Net.Http.Headers;
|
||||
|
||||
namespace MareSynchronosStaticFilesServer.Services;
|
||||
|
||||
// Notify distribution server of file hashes downloaded via shards, so they are not prematurely purged from its cold cache
|
||||
public class ShardTouchMessageService : ITouchHashService
|
||||
{
|
||||
private readonly ILogger<ShardTouchMessageService> _logger;
|
||||
private readonly ServerTokenGenerator _tokenGenerator;
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
private readonly HttpClient _httpClient;
|
||||
private readonly Uri _remoteCacheSourceUri;
|
||||
private readonly HashSet<string> _touchHashSet = new();
|
||||
private readonly ColdTouchHashService _nestedService = null;
|
||||
|
||||
private CancellationTokenSource _touchmsgCts;
|
||||
|
||||
public ShardTouchMessageService(ILogger<ShardTouchMessageService> logger, ILogger<ColdTouchHashService> nestedLogger,
|
||||
ServerTokenGenerator tokenGenerator, IConfigurationService<StaticFilesServerConfiguration> configuration)
|
||||
{
|
||||
_logger = logger;
|
||||
_tokenGenerator = tokenGenerator;
|
||||
_configuration = configuration;
|
||||
_remoteCacheSourceUri = _configuration.GetValueOrDefault<Uri>(nameof(StaticFilesServerConfiguration.DistributionFileServerAddress), null);
|
||||
_httpClient = new();
|
||||
_httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("MareSynchronosServer", "1.0.0.0"));
|
||||
|
||||
if (configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false))
|
||||
{
|
||||
_nestedService = new ColdTouchHashService(nestedLogger, configuration);
|
||||
}
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_remoteCacheSourceUri == null)
|
||||
return Task.CompletedTask;
|
||||
|
||||
_logger.LogInformation("Touch Message Service started");
|
||||
|
||||
_touchmsgCts = new();
|
||||
|
||||
_ = TouchMessageTask(_touchmsgCts.Token);
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_remoteCacheSourceUri == null)
|
||||
return Task.CompletedTask;
|
||||
|
||||
_touchmsgCts.Cancel();
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task SendTouches(IEnumerable<string> hashes)
|
||||
{
|
||||
var mainUrl = _remoteCacheSourceUri;
|
||||
var path = new Uri(mainUrl, MareFiles.Distribution + "/touch");
|
||||
using HttpRequestMessage msg = new()
|
||||
{
|
||||
RequestUri = path
|
||||
};
|
||||
msg.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _tokenGenerator.Token);
|
||||
msg.Method = HttpMethod.Post;
|
||||
msg.Content = JsonContent.Create(hashes);
|
||||
if (_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DistributionFileServerForceHTTP2), false))
|
||||
{
|
||||
msg.Version = new Version(2, 0);
|
||||
msg.VersionPolicy = HttpVersionPolicy.RequestVersionExact;
|
||||
}
|
||||
|
||||
_logger.LogDebug("Sending remote touch to {path}", path);
|
||||
try
|
||||
{
|
||||
using var result = await _httpClient.SendAsync(msg).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failure to send touches for {hashChunk}", hashes);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task TouchMessageTask(CancellationToken ct)
|
||||
{
|
||||
List<string> hashes;
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
lock (_touchHashSet)
|
||||
{
|
||||
hashes = _touchHashSet.ToList();
|
||||
_touchHashSet.Clear();
|
||||
}
|
||||
if (hashes.Count > 0)
|
||||
await SendTouches(hashes);
|
||||
await Task.Delay(TimeSpan.FromSeconds(30), ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Error during touch message task");
|
||||
}
|
||||
}
|
||||
|
||||
lock (_touchHashSet)
|
||||
{
|
||||
hashes = _touchHashSet.ToList();
|
||||
_touchHashSet.Clear();
|
||||
}
|
||||
if (hashes.Count > 0)
|
||||
await SendTouches(hashes);
|
||||
}
|
||||
|
||||
public void TouchColdHash(string hash)
|
||||
{
|
||||
if (_nestedService != null)
|
||||
_nestedService.TouchColdHash(hash);
|
||||
|
||||
lock (_touchHashSet)
|
||||
{
|
||||
_touchHashSet.Add(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -175,6 +175,17 @@ public class Startup
|
||||
services.AddHostedService(p => (MareConfigurationServiceClient<StaticFilesServerConfiguration>)p.GetService<IConfigurationService<StaticFilesServerConfiguration>>());
|
||||
}
|
||||
|
||||
if (_isDistributionNode)
|
||||
{
|
||||
services.AddSingleton<ITouchHashService, ColdTouchHashService>();
|
||||
services.AddHostedService(p => p.GetService<ITouchHashService>());
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddSingleton<ITouchHashService, ShardTouchMessageService>();
|
||||
services.AddHostedService(p => p.GetService<ITouchHashService>());
|
||||
}
|
||||
|
||||
// controller setup
|
||||
services.AddControllers().ConfigureApplicationPartManager(a =>
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user