using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; using LZ4; using MareSynchronos.API; using MareSynchronos.FileCacheDB; using MareSynchronos.Utils; using MareSynchronos.WebAPI.Utils; using Microsoft.AspNetCore.SignalR.Client; namespace MareSynchronos.WebAPI { public partial class ApiController { private readonly HashSet _verifiedUploadedHashes; private int _downloadId = 0; public void CancelUpload() { if (_uploadCancellationTokenSource != null) { Logger.Debug("Cancelling upload"); _uploadCancellationTokenSource?.Cancel(); _mareHub!.SendAsync(Api.SendFileAbortUpload); CurrentUploads.Clear(); } } public async Task DeleteAllMyFiles() { await _mareHub!.SendAsync(Api.SendFileDeleteAllFiles); } private async Task DownloadFile(int downloadId, string hash, Uri downloadUri, CancellationToken ct) { using WebClient wc = new(); wc.Headers.Add("Authorization", SecretKey); DownloadProgressChangedEventHandler progChanged = (s, e) => { try { CurrentDownloads[downloadId].Single(f => f.Hash == hash).Transferred = e.BytesReceived; } catch (Exception ex) { Logger.Warn("Could not set download progress for " + hash); Logger.Warn(ex.Message); Logger.Warn(ex.StackTrace ?? string.Empty); } }; wc.DownloadProgressChanged += progChanged; string fileName = Path.GetTempFileName(); ct.Register(wc.CancelAsync); try { await wc.DownloadFileTaskAsync(downloadUri, fileName); } catch { } CurrentDownloads[downloadId].Single(f => f.Hash == hash).Transferred = CurrentDownloads[downloadId].Single(f => f.Hash == hash).Total; wc.DownloadProgressChanged -= progChanged; return fileName; } public int GetDownloadId() => _downloadId++; public async Task DownloadFiles(int currentDownloadId, List fileReplacementDto, CancellationToken ct) { DownloadStarted?.Invoke(); Logger.Debug("Downloading files (Download ID " + currentDownloadId + ")"); List downloadFileInfoFromService = new List(); downloadFileInfoFromService.AddRange(await _mareHub!.InvokeAsync>(Api.InvokeGetFilesSizes, fileReplacementDto.Select(f => f.Hash).ToList(), ct)); Logger.Debug("Files with size 0 or less: " + string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash))); CurrentDownloads[currentDownloadId] = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d)) .Where(d => d.CanBeTransferred).ToList(); foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden)) { if (ForbiddenTransfers.All(f => f.Hash != dto.Hash)) { ForbiddenTransfers.Add(new DownloadFileTransfer(dto)); } } await Parallel.ForEachAsync(CurrentDownloads[currentDownloadId].Where(f => f.CanBeTransferred), new ParallelOptions() { MaxDegreeOfParallelism = 5, CancellationToken = ct }, async (file, token) => { var hash = file.Hash; var tempFile = await DownloadFile(currentDownloadId, file.Hash, file.DownloadUri, token); if (token.IsCancellationRequested) { File.Delete(tempFile); Logger.Debug("Detected cancellation, removing " + currentDownloadId); DownloadFinished?.Invoke(); CancelDownload(currentDownloadId); return; } var tempFileData = await File.ReadAllBytesAsync(tempFile, token); var extractedFile = LZ4Codec.Unwrap(tempFileData); File.Delete(tempFile); var filePath = Path.Combine(_pluginConfiguration.CacheFolder, file.Hash); await File.WriteAllBytesAsync(filePath, extractedFile, token); var fi = new FileInfo(filePath); Func RandomDayFunc() { DateTime start = new DateTime(1995, 1, 1); Random gen = new Random(); int range = (DateTime.Today - start).Days; return () => start.AddDays(gen.Next(range)); } fi.CreationTime = RandomDayFunc().Invoke(); fi.LastAccessTime = RandomDayFunc().Invoke(); fi.LastWriteTime = RandomDayFunc().Invoke(); try { _ = _fileDbManager.CreateFileCacheEntity(filePath); } catch (Exception ex) { Logger.Warn("Issue adding file to the DB"); Logger.Warn(ex.Message); Logger.Warn(ex.StackTrace); } }); Logger.Debug("Download complete, removing " + currentDownloadId); DownloadFinished?.Invoke(); CancelDownload(currentDownloadId); } public async Task PushCharacterData(CharacterCacheDto character, List visibleCharacterIds) { if (!IsConnected || SecretKey == "-") return; Logger.Debug("Sending Character data to service " + ApiUri); CancelUpload(); _uploadCancellationTokenSource = new CancellationTokenSource(); var uploadToken = _uploadCancellationTokenSource.Token; Logger.Verbose("New Token Created"); List unverifiedUploadHashes = new(); foreach (var item in character.FileReplacements.SelectMany(c => c.Value.Where(f => string.IsNullOrEmpty(f.FileSwapPath)).Select(v => v.Hash).Distinct()).Distinct().ToList()) { if (!_verifiedUploadedHashes.Contains(item)) { unverifiedUploadHashes.Add(item); } } if (unverifiedUploadHashes.Any()) { Logger.Debug("Verifying " + unverifiedUploadHashes.Count + " files"); var filesToUpload = await _mareHub!.InvokeAsync>(Api.InvokeFileSendFiles, unverifiedUploadHashes, uploadToken); foreach (var file in filesToUpload.Where(f => !f.IsForbidden)) { await using var db = new FileCacheContext(); try { CurrentUploads.Add(new UploadFileTransfer(file) { Total = new FileInfo(db.FileCaches.FirstOrDefault(f => f.Hash.ToLower() == file.Hash.ToLower()) ?.Filepath ?? string.Empty).Length }); } catch (Exception ex) { Logger.Warn("Tried to request file " + file.Hash + " but file was not present"); Logger.Warn(ex.StackTrace!); } } await using (var db = new FileCacheContext()) { foreach (var file in filesToUpload.Where(c => c.IsForbidden)) { if (ForbiddenTransfers.All(f => f.Hash != file.Hash)) { ForbiddenTransfers.Add(new UploadFileTransfer(file) { LocalFile = db.FileCaches.FirstOrDefault(f => f.Hash == file.Hash)?.Filepath ?? string.Empty }); } } } var totalSize = CurrentUploads.Sum(c => c.Total); Logger.Debug("Compressing and uploading files"); foreach (var file in CurrentUploads.Where(f => f.CanBeTransferred && !f.IsTransferred).ToList()) { Logger.Debug("Compressing and uploading " + file); var data = await GetCompressedFileData(file.Hash, uploadToken); CurrentUploads.Single(e => e.Hash == data.Item1).Total = data.Item2.Length; await UploadFile(data.Item2, file.Hash, uploadToken); if (!uploadToken.IsCancellationRequested) continue; Logger.Warn("Cancel in filesToUpload loop detected"); CurrentUploads.Clear(); break; } if (CurrentUploads.Any()) { var compressedSize = CurrentUploads.Sum(c => c.Total); Logger.Debug($"Compressed {totalSize} to {compressedSize} ({(compressedSize / (double)totalSize):P2})"); } Logger.Debug("Upload tasks complete, waiting for server to confirm"); var anyUploadsOpen = await _mareHub!.InvokeAsync(Api.InvokeFileIsUploadFinished, uploadToken); Logger.Debug("Uploads open: " + anyUploadsOpen); while (anyUploadsOpen && !uploadToken.IsCancellationRequested) { anyUploadsOpen = await _mareHub!.InvokeAsync(Api.InvokeFileIsUploadFinished, uploadToken); await Task.Delay(TimeSpan.FromSeconds(0.5), uploadToken); Logger.Debug("Waiting for uploads to finish"); } foreach (var item in unverifiedUploadHashes) { _verifiedUploadedHashes.Add(item); } CurrentUploads.Clear(); } else { Logger.Debug("All files already verified"); } if (!uploadToken.IsCancellationRequested) { Logger.Info("Pushing character data for " + character.GetHashCode() + " to " + string.Join(", ", visibleCharacterIds)); StringBuilder sb = new StringBuilder(); foreach (var item in character.FileReplacements) { sb.AppendLine($"FileReplacements for {item.Key}: {item.Value.Count}"); } foreach (var item in character.GlamourerData) { sb.AppendLine($"GlamourerData for {item.Key}: {!string.IsNullOrEmpty(item.Value)}"); } Logger.Debug("Chara data contained: " + Environment.NewLine + sb.ToString()); await _mareHub!.InvokeAsync(Api.InvokeUserPushCharacterDataToVisibleClients, character, visibleCharacterIds, uploadToken); } else { Logger.Warn("=== Upload operation was cancelled ==="); } Logger.Verbose("Upload complete for " + character.GetHashCode()); _uploadCancellationTokenSource = null; } private async Task<(string, byte[])> GetCompressedFileData(string fileHash, CancellationToken uploadToken) { await using var db = new FileCacheContext(); var fileCache = db.FileCaches.First(f => f.Hash == fileHash); return (fileHash, LZ4Codec.WrapHC(await File.ReadAllBytesAsync(fileCache.Filepath, uploadToken), 0, (int)new FileInfo(fileCache.Filepath).Length)); } private async Task UploadFile(byte[] compressedFile, string fileHash, CancellationToken uploadToken) { if (uploadToken.IsCancellationRequested) return; async IAsyncEnumerable AsyncFileData([EnumeratorCancellation] CancellationToken token) { var chunkSize = 1024 * 512; // 512kb using var ms = new MemoryStream(compressedFile); var buffer = new byte[chunkSize]; int bytesRead; while ((bytesRead = await ms.ReadAsync(buffer, 0, chunkSize, token)) > 0 && !token.IsCancellationRequested) { CurrentUploads.Single(f => f.Hash == fileHash).Transferred += bytesRead; token.ThrowIfCancellationRequested(); yield return bytesRead == chunkSize ? buffer.ToArray() : buffer.Take(bytesRead).ToArray(); } } await _mareHub!.SendAsync(Api.SendFileUploadFileStreamAsync, fileHash, AsyncFileData(uploadToken), uploadToken); } public void CancelDownload(int downloadId) { while (CurrentDownloads.ContainsKey(downloadId)) { CurrentDownloads.TryRemove(downloadId, out _); } } } }