From 9fa65174864d2e1ed19feb1a01e77f9dabf6992e Mon Sep 17 00:00:00 2001 From: Stanley Dimant Date: Mon, 20 Jun 2022 00:49:57 +0200 Subject: [PATCH] add streaming for uploads/aborting, whitelist changes and so on --- .../MareSynchronosServer/Hubs/Files.cs | 98 +++++++++++++++---- .../MareSynchronosServer/Hubs/User.cs | 77 +++++++++++---- .../MareSynchronosServer/Startup.cs | 4 +- 3 files changed, 139 insertions(+), 40 deletions(-) diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/Files.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/Files.cs index 5b2ae84..283e6cc 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/Files.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/Files.cs @@ -1,10 +1,14 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; +using System.Globalization; using System.IO; using System.Linq; using System.Security.Claims; using System.Security.Cryptography; +using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using MareSynchronos.API; using MareSynchronosServer.Authentication; @@ -18,14 +22,25 @@ namespace MareSynchronosServer.Hubs { public class Files : BaseHub { + private static readonly ConcurrentDictionary UserUploads = new(); public Files(MareDbContext dbContext) : base(dbContext) { } [Authorize(AuthenticationSchemes = SecretKeyAuthenticationHandler.AUTH_SCHEME)] - public async Task SendFiles(List fileList) + public async Task AbortUpload() + { + var userId = AuthenticatedUserId; + var notUploadedFiles = DbContext.Files.Where(f => !f.Uploaded && f.Uploader.UID == userId).ToList(); + DbContext.RemoveRange(notUploadedFiles); + await DbContext.SaveChangesAsync(); + } + + [Authorize(AuthenticationSchemes = SecretKeyAuthenticationHandler.AUTH_SCHEME)] + public async Task> SendFiles(List fileList) { var fileListHashes = fileList.Select(x => x.Hash).ToList(); + List filesToUpload = new List(); var existingFiles = DbContext.Files.Where(f => fileListHashes.Contains(f.Hash)).ToList(); foreach (var file in fileListHashes.Where(f => existingFiles.All(e => e.Hash != f))) { @@ -38,8 +53,10 @@ namespace MareSynchronosServer.Hubs Uploader = DbContext.Users.Single(u => u.UID == userId) }); await DbContext.SaveChangesAsync(); - await Clients.Caller!.SendAsync("FileRequest", file); + filesToUpload.Add(file); } + + return filesToUpload; } [Authorize(AuthenticationSchemes = SecretKeyAuthenticationHandler.AUTH_SCHEME)] @@ -50,37 +67,78 @@ namespace MareSynchronosServer.Hubs } [Authorize(AuthenticationSchemes = SecretKeyAuthenticationHandler.AUTH_SCHEME)] - public async Task UploadFile(string hash, byte[] file) + public async Task UploadFile(string hash, ChannelReader stream) { var relatedFile = DbContext.Files.SingleOrDefault(f => f.Hash == hash); - if (relatedFile == null) return false; - var decodedFile = LZ4.LZ4Codec.Unwrap(file); - using var sha1 = new SHA1CryptoServiceProvider(); - var computedHash = await sha1.ComputeHashAsync(new MemoryStream(decodedFile)); - var computedHashString = BitConverter.ToString(computedHash).Replace("-", ""); - if (hash != computedHashString) + if (relatedFile == null) return; + List uploadedFile = new(); + while (await stream.WaitToReadAsync()) { - DbContext.Remove(relatedFile); - await DbContext.SaveChangesAsync(); - return false; + while (stream.TryRead(out var byteChunk)) + { + uploadedFile.AddRange(byteChunk); + } + } + Debug.WriteLine(DateTime.Now.ToString(CultureInfo.InvariantCulture) + ": File size of " + hash + ":" + uploadedFile.Count); + try + { + var decodedFile = LZ4.LZ4Codec.Unwrap(uploadedFile.ToArray()); + using var sha1 = new SHA1CryptoServiceProvider(); + var computedHash = await sha1.ComputeHashAsync(new MemoryStream(decodedFile)); + var computedHashString = BitConverter.ToString(computedHash).Replace("-", ""); + if (hash != computedHashString) + { + DbContext.Remove(relatedFile); + await DbContext.SaveChangesAsync(); + return; + } + + await File.WriteAllBytesAsync(@"G:\ServerTest\" + hash, uploadedFile.ToArray()); + relatedFile = DbContext.Files.Single(f => f.Hash == hash); + relatedFile.Uploaded = true; + relatedFile.LastAccessTime = DateTime.Now; + await DbContext.SaveChangesAsync(); + return; + } + catch (Exception ex) + { + Debug.Write(ex.Message); } - await File.WriteAllBytesAsync(@"G:\ServerTest\" + hash, file); - relatedFile.Uploaded = true; - relatedFile.LastAccessTime = DateTime.Now; - await DbContext.SaveChangesAsync(); - return true; } [Authorize(AuthenticationSchemes = SecretKeyAuthenticationHandler.AUTH_SCHEME)] - public async Task DownloadFile(string hash) + public async Task GetFileSize(string hash) + { + var file = await DbContext.Files.SingleOrDefaultAsync(f => f.Hash == hash); + if (file == null) return -1; + return new FileInfo(@"G:\ServerTest\" + hash).Length; + } + + [Authorize(AuthenticationSchemes = SecretKeyAuthenticationHandler.AUTH_SCHEME)] + public async Task> DownloadFile(string hash) { var file = DbContext.Files.SingleOrDefault(f => f.Hash == hash); - if (file == null) return Array.Empty(); - return await File.ReadAllBytesAsync(@"G:\ServerTest\" + hash); + if (file == null) return null; + var compressedFile = await File.ReadAllBytesAsync(@"G:\ServerTest\" + hash); + var chunkSize = 1024 * 512; // 512kb + var chunks = (int)Math.Ceiling(compressedFile.Length / (double)chunkSize); + var channel = Channel.CreateBounded(chunkSize); + _ = Task.Run(() => + { + for (var i = 0; i < chunks; i++) + { + channel.Writer.TryWrite(compressedFile.Skip(i * chunkSize).Take(chunkSize).ToArray()); + } + + channel.Writer.Complete(); + }); + + return channel.Reader; } public override Task OnDisconnectedAsync(Exception exception) { + Debug.WriteLine("Detected disconnect from " + AuthenticatedUserId); var userId = AuthenticatedUserId; var notUploadedFiles = DbContext.Files.Where(f => !f.Uploaded && f.Uploader.UID == userId).ToList(); DbContext.RemoveRange(notUploadedFiles); diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/User.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/User.cs index 4fc6c81..6705de6 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/User.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/User.cs @@ -83,8 +83,7 @@ namespace MareSynchronosServer.Hubs var whitelistEntriesHavingThisUser = DbContext.Whitelists .Include(w => w.User) .Include(w => w.OtherUser) - .Where(w => w.OtherUser.UID == uid && !w.IsPaused - && visibleCharacterWithJobs.Keys.Contains(w.User.CharacterIdentification)) + .Where(w => w.OtherUser.UID == uid && !w.IsPaused && visibleCharacterWithJobs.Keys.Contains(w.User.CharacterIdentification)) .ToList(); foreach (var whiteListEntry in whitelistEntriesHavingThisUser) { @@ -97,7 +96,13 @@ namespace MareSynchronosServer.Hubs DbContext.CharacterData.SingleOrDefaultAsync(c => c.UserId == whiteListEntry.User.UID && c.JobId == dictEntry); if (cachedChar != null) { - await Clients.User(uid).SendAsync("ReceiveCharacterData", cachedChar, + await Clients.User(uid).SendAsync("ReceiveCharacterData", new CharacterCacheDto() + { + FileReplacements = cachedChar.EquipmentData, + Hash = cachedChar.Hash, + JobId = cachedChar.JobId, + GlamourerData = cachedChar.GlamourerData + }, whiteListEntry.User.CharacterIdentification); } } @@ -134,24 +139,39 @@ namespace MareSynchronosServer.Hubs JobId = characterCache.JobId }; await DbContext.CharacterData.AddAsync(data); + await DbContext.SaveChangesAsync(); } - await DbContext.SaveChangesAsync(); - - foreach (var whitelistEntry in whitelistEntriesHavingThisUser) + if ((existingCharacterData != null && existingCharacterData.Hash != characterCache.Hash) || existingCharacterData == null) { - var ownEntry = DbContext.Whitelists.SingleOrDefault(w => - w.User.UID == uid && w.OtherUser.UID == whitelistEntry.User.UID); - if (ownEntry == null || ownEntry.IsPaused) continue; - await Clients.User(whitelistEntry.User.UID).SendAsync("ReceiveCharacterData", characterCache, whitelistEntry.OtherUser.CharacterIdentification); + foreach (var whitelistEntry in whitelistEntriesHavingThisUser) + { + var ownEntry = DbContext.Whitelists.SingleOrDefault(w => + w.User.UID == uid && w.OtherUser.UID == whitelistEntry.User.UID); + if (ownEntry == null || ownEntry.IsPaused) continue; + await Clients.User(whitelistEntry.User.UID).SendAsync("ReceiveCharacterData", characterCache, + whitelistEntry.OtherUser.CharacterIdentification); + } } } [Authorize(AuthenticationSchemes = SecretKeyAuthenticationHandler.AUTH_SCHEME)] - public async Task SendCharacterNameHash(string characterNameHash) + public async Task> SendCharacterNameHash(string characterNameHash) { - DbContext.Users.Single(u => u.UID == AuthenticatedUserId).CharacterIdentification = characterNameHash; + var ownUser = DbContext.Users.Single(u => u.UID == AuthenticatedUserId); + ownUser.CharacterIdentification = characterNameHash; await DbContext.SaveChangesAsync(); + var otherUsers = await DbContext.Whitelists + .Include(u => u.User) + .Include(u => u.OtherUser) + .Where(w => w.User == ownUser) + .Where(w => !string.IsNullOrEmpty(w.OtherUser.CharacterIdentification)) + .Select(e => e.OtherUser).ToListAsync(); + var otherEntries = await DbContext.Whitelists.Include(u => u.User) + .Where(u => otherUsers.Any(e => e == u.User) && u.OtherUser == ownUser).ToListAsync(); + + await Clients.Users(otherEntries.Select(e => e.User.UID)).SendAsync("AddOnlineWhitelistedPlayer", characterNameHash); + return otherEntries.Select(e => e.User.CharacterIdentification).ToList(); } [Authorize(AuthenticationSchemes = SecretKeyAuthenticationHandler.AUTH_SCHEME)] @@ -179,6 +199,14 @@ namespace MareSynchronosServer.Hubs }, otherUser.CharacterIdentification); if (otherEntry != null) { + if (string.IsNullOrEmpty(otherUser.CharacterIdentification)) + { + await Clients.User(user.UID) + .SendAsync("AddOnlineWhitelistedPlayer", otherUser.CharacterIdentification); + await Clients.User(otherUser.UID) + .SendAsync("AddOnlineWhitelistedPlayer", user.CharacterIdentification); + } + await Clients.User(uid).SendAsync("UpdateWhitelist", new WhitelistDto() { @@ -211,6 +239,13 @@ namespace MareSynchronosServer.Hubs }, otherUser.CharacterIdentification); if (otherEntry != null) { + if (string.IsNullOrEmpty(otherUser.CharacterIdentification)) + { + await Clients.User(user.UID) + .SendAsync("RemoveOnlineWhitelistedPlayer", otherUser.CharacterIdentification); + await Clients.User(otherUser.UID) + .SendAsync("RemoveOnlineWhitelistedPlayer", user.CharacterIdentification); + } await Clients.User(uid).SendAsync("UpdateWhitelist", new WhitelistDto() { OtherUID = user.UID, @@ -233,6 +268,7 @@ namespace MareSynchronosServer.Hubs DbContext.Update(wl); await DbContext.SaveChangesAsync(); var otherEntry = OppositeEntry(uid); + await Clients.User(user.UID) .SendAsync("UpdateWhitelist", new WhitelistDto() { @@ -266,13 +302,6 @@ namespace MareSynchronosServer.Hubs .Select(w => { var otherEntry = OppositeEntry(w.OtherUser.UID); - var otherUser = GetUserFromUID(w.OtherUser.UID); - var seesYou = false; - if (otherEntry != null) - { - seesYou = DbContext.Visibilities.Any(v => - v.CID == otherUser.CharacterIdentification && v.OtherCID == user.CharacterIdentification); - } return new WhitelistDto { IsPaused = w.IsPaused, @@ -289,6 +318,16 @@ namespace MareSynchronosServer.Hubs var user = DbContext.Users.SingleOrDefault(u => u.UID == AuthenticatedUserId); if (user != null) { + var otherUsers = DbContext.Whitelists + .Include(u => u.User) + .Include(u => u.OtherUser) + .Where(w => w.User == user) + .Where(w => !string.IsNullOrEmpty(w.OtherUser.CharacterIdentification)) + .Select(e => e.OtherUser).ToList(); + var otherEntries = DbContext.Whitelists.Include(u => u.User) + .Where(u => otherUsers.Any(e => e == u.User) && u.OtherUser == user).ToList(); + _ = Clients.Users(otherEntries.Select(e => e.User.UID)).SendAsync("RemoveOnlineWhitelistedPlayer", user.CharacterIdentification); + var outdatedVisibilities = DbContext.Visibilities.Where(v => v.CID == user.CharacterIdentification); DbContext.RemoveRange(outdatedVisibilities); var outdatedCharacterData = DbContext.CharacterData.Where(v => v.UserId == user.UID); diff --git a/MareSynchronosServer/MareSynchronosServer/Startup.cs b/MareSynchronosServer/MareSynchronosServer/Startup.cs index 2c73d6f..ec7b52b 100644 --- a/MareSynchronosServer/MareSynchronosServer/Startup.cs +++ b/MareSynchronosServer/MareSynchronosServer/Startup.cs @@ -49,7 +49,9 @@ namespace MareSynchronosServer opts.MimeTypes = ResponseCompressionDefaults.MimeTypes.Concat(new[] { "application/octet-stream" }); }); services.AddDbContext(options => - options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"))); + { + options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")); + }); services.AddDatabaseDeveloperPageExceptionFilter(); services.AddAuthentication(options => options.DefaultScheme = SecretKeyAuthenticationHandler.AUTH_SCHEME) .AddScheme(SecretKeyAuthenticationHandler.AUTH_SCHEME, options => { });