diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Admin.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Admin.cs index 2c7db77..e4305e3 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Admin.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Admin.cs @@ -96,13 +96,13 @@ public partial class MareHub if (!IsModerator) return null; var users = await _dbContext.Users.AsNoTracking().ToListAsync().ConfigureAwait(false); - return users.Where(c => !string.IsNullOrEmpty(_clientIdentService.GetCharacterIdentForUid(c.UID).Result)).Select(async b => new OnlineUserDto + return users.Where(c => !string.IsNullOrEmpty(_clientIdentService.GetCharacterIdentForUid(c.UID))).Select(b => new OnlineUserDto { - CharacterNameHash = await _clientIdentService.GetCharacterIdentForUid(b.UID).ConfigureAwait(false), + CharacterNameHash = _clientIdentService.GetCharacterIdentForUid(b.UID), UID = b.UID, IsModerator = b.IsModerator, IsAdmin = b.IsAdmin - }).Select(c => c.Result).ToList(); + }).ToList(); } [Authorize(AuthenticationSchemes = SecretKeyGrpcAuthenticationHandler.AuthScheme)] @@ -128,7 +128,7 @@ public partial class MareHub await _dbContext.SaveChangesAsync().ConfigureAwait(false); await Clients.Users(OnlineAdmins).Client_AdminUpdateOrAddBannedUser(dto).ConfigureAwait(false); - var bannedUser = await _clientIdentService.GetUidForCharacterIdent(dto.CharacterHash).ConfigureAwait(false); + var bannedUser = _clientIdentService.GetUidForCharacterIdent(dto.CharacterHash); if (!string.IsNullOrEmpty(bannedUser)) { await Clients.User(bannedUser).Client_AdminForcedReconnect().ConfigureAwait(false); diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Functions.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Functions.cs index fde98db..363f522 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Functions.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Functions.cs @@ -86,7 +86,7 @@ public partial class MareHub if (userPair.IsPausedPerGroup is PauseInfo.Unpaused) return; } - var groupUserIdent = await _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID).ConfigureAwait(false); + var groupUserIdent = _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID); if (!string.IsNullOrEmpty(groupUserIdent)) { await Clients.User(uid).Client_UserChangePairedPlayer(groupUserIdent, false).ConfigureAwait(false); @@ -98,7 +98,7 @@ public partial class MareHub { foreach (var pair in groupUsers) { - var pairIdent = await _clientIdentService.GetCharacterIdentForUid(pair.GroupUserUID).ConfigureAwait(false); + var pairIdent = _clientIdentService.GetCharacterIdentForUid(pair.GroupUserUID); if (string.IsNullOrEmpty(pairIdent)) continue; var pairs = await GetAllPairedClientsWithPauseState(pair.GroupUserUID).ConfigureAwait(false); diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Groups.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Groups.cs index 7a867d3..4823e9d 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Groups.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.Groups.cs @@ -220,7 +220,7 @@ public partial class MareHub var allUserPairs = await GetAllPairedClientsWithPauseState().ConfigureAwait(false); - var userIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); + var userIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); foreach (var groupUserPair in groupPairs) { var userPair = allUserPairs.Single(p => string.Equals(p.UID, groupUserPair.GroupUserUID, StringComparison.Ordinal)); @@ -228,7 +228,7 @@ public partial class MareHub if (userPair.IsPausedExcludingGroup(gid) is PauseInfo.Unpaused) continue; if (userPair.IsPausedPerGroup is PauseInfo.Paused) continue; - var groupUserIdent = await _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID).ConfigureAwait(false); + var groupUserIdent = _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID); if (!string.IsNullOrEmpty(groupUserIdent)) { await Clients.User(AuthenticatedUserId).Client_UserChangePairedPlayer(groupUserIdent, true).ConfigureAwait(false); @@ -315,7 +315,7 @@ public partial class MareHub var allUserPairs = await GetAllPairedClientsWithPauseState().ConfigureAwait(false); - var userIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); + var userIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); foreach (var groupUserPair in groupPairsWithoutSelf) { await UserGroupLeave(groupUserPair, allUserPairs, userIdent).ConfigureAwait(false); @@ -351,7 +351,7 @@ public partial class MareHub var allUserPairs = await GetAllPairedClientsWithPauseState().ConfigureAwait(false); - var userIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); + var userIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); foreach (var groupUserPair in groupPairs) { var userPair = allUserPairs.SingleOrDefault(p => string.Equals(p.UID, groupUserPair.GroupUserUID, StringComparison.Ordinal)); @@ -361,7 +361,7 @@ public partial class MareHub if (userPair.IsPausedExcludingGroup(gid) is PauseInfo.Unpaused) continue; } - var groupUserIdent = await _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID).ConfigureAwait(false); + var groupUserIdent = _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID); if (!string.IsNullOrEmpty(groupUserIdent)) { await Clients.User(AuthenticatedUserId).Client_UserChangePairedPlayer(groupUserIdent, !isPaused).ConfigureAwait(false); @@ -395,7 +395,7 @@ public partial class MareHub UserUID = uid, }).ConfigureAwait(false); - var userIdent = await _clientIdentService.GetCharacterIdentForUid(uid).ConfigureAwait(false); + var userIdent = _clientIdentService.GetCharacterIdentForUid(uid); if (userIdent == null) return; await Clients.User(uid).Client_GroupChange(new GroupDto() @@ -641,7 +641,7 @@ public partial class MareHub UserUID = pair.GroupUserUID }).ConfigureAwait(false); - var pairIdent = await _clientIdentService.GetCharacterIdentForUid(pair.GroupUserUID).ConfigureAwait(false); + var pairIdent = _clientIdentService.GetCharacterIdentForUid(pair.GroupUserUID); if (string.IsNullOrEmpty(pairIdent)) continue; var allUserPairs = await GetAllPairedClientsWithPauseState(pair.GroupUserUID).ConfigureAwait(false); diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.User.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.User.cs index f82ccc5..b10a49a 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.User.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.User.cs @@ -22,7 +22,7 @@ public partial class MareHub string userid = AuthenticatedUserId; var userEntry = await _dbContext.Users.SingleAsync(u => u.UID == userid).ConfigureAwait(false); - var charaIdent = await _clientIdentService.GetCharacterIdentForUid(userid).ConfigureAwait(false); + var charaIdent = _clientIdentService.GetCharacterIdentForUid(userid); var ownPairData = await _dbContext.ClientPairs.Where(u => u.User.UID == userid).ToListAsync().ConfigureAwait(false); var auth = await _dbContext.Auth.SingleAsync(u => u.UserUID == userid).ConfigureAwait(false); var lodestone = await _dbContext.LodeStoneAuth.SingleOrDefaultAsync(a => a.User.UID == userid).ConfigureAwait(false); @@ -71,10 +71,10 @@ public partial class MareHub { _logger.LogCallInfo(); - var ownIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); + var ownIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); var usersToSendOnlineTo = await SendOnlineToAllPairedUsers(ownIdent).ConfigureAwait(false); - return usersToSendOnlineTo.Select(async e => await _clientIdentService.GetCharacterIdentForUid(e).ConfigureAwait(false)).Select(t => t.Result).Where(t => !string.IsNullOrEmpty(t)).Distinct(System.StringComparer.Ordinal).ToList(); + return usersToSendOnlineTo.Select(e => _clientIdentService.GetCharacterIdentForUid(e)).Where(t => !string.IsNullOrEmpty(t)).Distinct(System.StringComparer.Ordinal).ToList(); } [Authorize(AuthenticationSchemes = SecretKeyGrpcAuthenticationHandler.AuthScheme)] @@ -125,10 +125,10 @@ public partial class MareHub var allPairedUsers = await GetAllPairedUnpausedUsers().ConfigureAwait(false); - var allPairedUsersDict = allPairedUsers.ToDictionary(f => f, async f => await _clientIdentService.GetCharacterIdentForUid(f).ConfigureAwait(false), System.StringComparer.Ordinal) - .Where(f => visibleCharacterIds.Contains(f.Value.Result, System.StringComparer.Ordinal)); + var allPairedUsersDict = allPairedUsers.ToDictionary(f => f, f => _clientIdentService.GetCharacterIdentForUid(f), System.StringComparer.Ordinal) + .Where(f => visibleCharacterIds.Contains(f.Value, System.StringComparer.Ordinal)); - var ownIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); + var ownIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); _logger.LogCallInfo(MareHubLogger.Args(visibleCharacterIds.Count, allPairedUsersDict.Count())); @@ -185,7 +185,7 @@ public partial class MareHub if (otherEntry == null) return; // check if other user is online - var otherIdent = await _clientIdentService.GetCharacterIdentForUid(otherUser.UID).ConfigureAwait(false); + var otherIdent = _clientIdentService.GetCharacterIdentForUid(otherUser.UID); if (otherIdent == null) return; // send push with update to other user if other user is online @@ -200,7 +200,7 @@ public partial class MareHub }).ConfigureAwait(false); // get own ident and all pairs - var userIdent = await _clientIdentService.GetCharacterIdentForUid(user.UID).ConfigureAwait(false); + var userIdent = _clientIdentService.GetCharacterIdentForUid(user.UID); var allUserPairs = await GetAllPairedClientsWithPauseState().ConfigureAwait(false); // if the other user has paused the main user and there was no previous group connection don't send anything @@ -246,8 +246,8 @@ public partial class MareHub IsSynced = true }).ConfigureAwait(false); - var selfCharaIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); - var otherCharaIdent = await _clientIdentService.GetCharacterIdentForUid(pair.OtherUserUID).ConfigureAwait(false); + var selfCharaIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); + var otherCharaIdent = _clientIdentService.GetCharacterIdentForUid(pair.OtherUserUID); if (selfCharaIdent == null || otherCharaIdent == null || otherEntry.IsPaused) return; @@ -287,7 +287,7 @@ public partial class MareHub if (oppositeClientPair == null) return; // check if other user is online, if no then there is no need to do anything further - var otherIdent = await _clientIdentService.GetCharacterIdentForUid(otherUserUid).ConfigureAwait(false); + var otherIdent = _clientIdentService.GetCharacterIdentForUid(otherUserUid); if (otherIdent == null) return; // get own ident and @@ -313,7 +313,7 @@ public partial class MareHub // if neither user had paused each other and either is not in an unpaused group with each other, change state to offline if (!callerHadPaused && !otherHadPaused && isPausedInGroup) { - var userIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); + var userIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); await Clients.User(AuthenticatedUserId).Client_UserChangePairedPlayer(otherIdent, false).ConfigureAwait(false); await Clients.User(otherUserUid).Client_UserChangePairedPlayer(userIdent, false).ConfigureAwait(false); } @@ -321,7 +321,7 @@ public partial class MareHub // if the caller had paused other but not the other has paused the caller and they are in an unpaused group together, change state to online if (callerHadPaused && !otherHadPaused && !isPausedInGroup) { - var userIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); + var userIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); await Clients.User(AuthenticatedUserId).Client_UserChangePairedPlayer(otherIdent, true).ConfigureAwait(false); await Clients.User(otherUserUid).Client_UserChangePairedPlayer(userIdent, true).ConfigureAwait(false); } diff --git a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs index 8b1c878..b1eb543 100644 --- a/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs +++ b/MareSynchronosServer/MareSynchronosServer/Hubs/MareHub.cs @@ -70,7 +70,7 @@ public partial class MareHub : Hub, IMareHub if (!string.IsNullOrEmpty(userId) && !isBanned && !string.IsNullOrEmpty(characterIdentification)) { var user = (await _dbContext.Users.SingleAsync(u => u.UID == userId).ConfigureAwait(false)); - var existingIdent = await _clientIdentService.GetCharacterIdentForUid(userId).ConfigureAwait(false); + var existingIdent = _clientIdentService.GetCharacterIdentForUid(userId); if (!string.IsNullOrEmpty(existingIdent) && !string.Equals(characterIdentification, existingIdent, StringComparison.Ordinal)) { _logger.LogCallWarning(MareHubLogger.Args(characterIdentification, "Failure", "LoggedIn")); @@ -82,7 +82,7 @@ public partial class MareHub : Hub, IMareHub } user.LastLoggedIn = DateTime.UtcNow; - await _clientIdentService.MarkUserOnline(user.UID, characterIdentification).ConfigureAwait(false); + _clientIdentService.MarkUserOnline(user.UID, characterIdentification); await _dbContext.SaveChangesAsync().ConfigureAwait(false); _logger.LogCallInfo(MareHubLogger.Args(characterIdentification, "Success")); @@ -114,7 +114,7 @@ public partial class MareHub : Hub, IMareHub [Authorize(AuthenticationSchemes = SecretKeyGrpcAuthenticationHandler.AuthScheme)] public async Task CheckClientHealth() { - var serverId = await _clientIdentService.GetServerForUid(AuthenticatedUserId).ConfigureAwait(false); + var serverId = _clientIdentService.GetServerForUid(AuthenticatedUserId); bool needsReconnect = false; if (string.IsNullOrEmpty(serverId) || !string.Equals(serverId, _shardName, StringComparison.Ordinal)) { @@ -135,7 +135,7 @@ public partial class MareHub : Hub, IMareHub { _mareMetrics.DecGauge(MetricsAPI.GaugeConnections); - var userCharaIdent = await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false); + var userCharaIdent = _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId); if (!string.IsNullOrEmpty(userCharaIdent)) { @@ -147,7 +147,7 @@ public partial class MareHub : Hub, IMareHub _dbContext.RemoveRange(_dbContext.Files.Where(f => !f.Uploaded && f.UploaderUID == AuthenticatedUserId)); - await _clientIdentService.MarkUserOffline(AuthenticatedUserId).ConfigureAwait(false); + _clientIdentService.MarkUserOffline(AuthenticatedUserId); await _dbContext.SaveChangesAsync().ConfigureAwait(false); } diff --git a/MareSynchronosServer/MareSynchronosServer/Services/GrpcClientIdentificationService.cs b/MareSynchronosServer/MareSynchronosServer/Services/GrpcClientIdentificationService.cs index 1bbdcff..7f35c86 100644 --- a/MareSynchronosServer/MareSynchronosServer/Services/GrpcClientIdentificationService.cs +++ b/MareSynchronosServer/MareSynchronosServer/Services/GrpcClientIdentificationService.cs @@ -18,11 +18,12 @@ public class GrpcClientIdentificationService : IHostedService private readonly ILogger _logger; private readonly IdentificationService.IdentificationServiceClient _grpcIdentClient; private readonly MareMetrics _metrics; - protected ConcurrentDictionary OnlineClients = new(StringComparer.Ordinal); - private ConcurrentDictionary RemoteCachedIdents = new(StringComparer.Ordinal); - private readonly TimeSpan InvalidateCachedIdent = TimeSpan.FromSeconds(30); + protected readonly ConcurrentDictionary OnlineClients = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary RemoteCachedIdents = new(StringComparer.Ordinal); private bool _grpcIsFaulty = false; - private CancellationTokenSource _cacheVerificationCts = new(); + private ConcurrentQueue _identChangeQueue = new(); + private CancellationTokenSource _streamCts = new(); + private CancellationTokenSource _faultCheckCts = new(); public GrpcClientIdentificationService(ILogger logger, IdentificationService.IdentificationServiceClient gprcIdentClient, MareMetrics metrics, IConfiguration configuration) { @@ -33,74 +34,34 @@ public class GrpcClientIdentificationService : IHostedService _metrics = metrics; } - private async Task HandleCacheVerification(CancellationToken ct) + public string? GetCharacterIdentForUid(string uid) { - while (!ct.IsCancellationRequested) + if (OnlineClients.TryGetValue(uid, out var ident)) { - try - { - if (RemoteCachedIdents.Any()) - { - MultiUidMessage req = new(); - req.Uids.AddRange(RemoteCachedIdents.Select(k => new UidMessage() { Uid = k.Key })); - var cacheResponse = await InvokeOnGrpc(_grpcIdentClient.ValidateCachedIdentsAsync(req)).ConfigureAwait(false); - - foreach (var entry in cacheResponse.UidWithIdent) - { - if (!RemoteCachedIdents.TryGetValue(entry.Uid.Uid, out var ident)) - { - continue; - } - - if (string.IsNullOrEmpty(entry.Ident.Ident)) - { - RemoteCachedIdents.TryRemove(entry.Uid.Uid, out var _); - continue; - } - - RemoteCachedIdents[entry.Uid.Uid] = entry.Ident.Ident; - } - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, "Error during cached idents verification"); - } - finally - { - await Task.Delay(InvalidateCachedIdent, ct).ConfigureAwait(false); - } - } - } - - public async Task GetCharacterIdentForUid(string uid) - { - if (OnlineClients.TryGetValue(uid, out string ident)) - { - return ident; + return ident.Ident.Ident; } if (RemoteCachedIdents.TryGetValue(uid, out var cachedIdent)) { - return cachedIdent; + return cachedIdent.Ident.Ident; } - var result = await InvokeOnGrpc(_grpcIdentClient.GetIdentForUidAsync(new UidMessage { Uid = uid })).ConfigureAwait(false); - if (result == default(CharacterIdentMessage)) return null; - RemoteCachedIdents[uid] = result.Ident; - return result.Ident; + return null; } - public async Task GetServerForUid(string uid) + public string? GetServerForUid(string uid) { if (OnlineClients.ContainsKey(uid)) { return _shardName; } - var result = await InvokeOnGrpc(_grpcIdentClient.GetIdentForUidAsync(new UidMessage { Uid = uid })).ConfigureAwait(false); - if (result == default(CharacterIdentMessage)) return null; - return result.ServerId; + if (RemoteCachedIdents.TryGetValue(uid, out var cachedIdent)) + { + return cachedIdent.Ident.ServerId; + } + + return null; } public async Task GetOnlineUsers() @@ -110,46 +71,172 @@ public class GrpcClientIdentificationService : IHostedService return result.Count; } - public async Task GetUidForCharacterIdent(string characterIdent) + public string? GetUidForCharacterIdent(string characterIdent) { - bool existsLocal = OnlineClients.Any(o => string.Equals(o.Value, characterIdent, StringComparison.Ordinal)); + bool existsLocal = OnlineClients.Any(o => string.Equals(o.Value.Ident.Ident, characterIdent, StringComparison.Ordinal)); if (existsLocal) { - return OnlineClients.First(c => string.Equals(c.Value, characterIdent, StringComparison.Ordinal)).Key; + return OnlineClients.First(c => string.Equals(c.Value.Ident.Ident, characterIdent, StringComparison.Ordinal)).Key; } - var result = await InvokeOnGrpc(_grpcIdentClient.GetUidForCharacterIdentAsync(new CharacterIdentMessage { Ident = characterIdent, ServerId = string.Empty })).ConfigureAwait(false); - if (result == default(UidMessage)) return null; - return result.Uid; + bool existsCached = RemoteCachedIdents.Any(o => string.Equals(o.Value.Ident.Ident, characterIdent, StringComparison.Ordinal)); + if (existsCached) + { + return RemoteCachedIdents.First(c => string.Equals(c.Value.Ident.Ident, characterIdent, StringComparison.Ordinal)).Key; + } + + return null; } - public async Task MarkUserOffline(string uid) + public void MarkUserOffline(string uid) { - OnlineClients.TryRemove(uid, out _); - _metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count); - await ExecuteOnGrpc(_grpcIdentClient.RemoveIdentForUidAsync(new RemoveIdentMessage() { ServerId = _shardName, Uid = uid })).ConfigureAwait(false); + if (OnlineClients.TryRemove(uid, out var uidWithIdent)) + { + _metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count); + _identChangeQueue.Enqueue(new IdentChange() + { + IsOnline = false, + UidWithIdent = uidWithIdent + }); + } } - public async Task MarkUserOnline(string uid, string charaIdent) + public void MarkUserOnline(string uid, string charaIdent) { - OnlineClients[uid] = charaIdent; + OnlineClients[uid] = new UidWithIdent() + { + Uid = new() + { + Uid = uid, + }, + Ident = new() + { + Ident = charaIdent, + ServerId = _shardName + } + }; + _metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count); - await ExecuteOnGrpc(_grpcIdentClient.SetIdentForUidAsync(new SetIdentMessage() { Ident = charaIdent, ServerId = _shardName, Uid = uid })).ConfigureAwait(false); + _identChangeQueue.Enqueue(new IdentChange() + { + IsOnline = true, + UidWithIdent = OnlineClients[uid] + }); } public async Task StartAsync(CancellationToken cancellationToken) { - await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName })).ConfigureAwait(false); - - _ = HandleCacheVerification(_cacheVerificationCts.Token); + _ = RestartStreams(); + _ = CheckGrpcFaults(_faultCheckCts.Token); } public async Task StopAsync(CancellationToken cancellationToken) { - _cacheVerificationCts.Cancel(); + _faultCheckCts.Cancel(); + _streamCts.Cancel(); await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName })).ConfigureAwait(false); } + private async Task CheckGrpcFaults(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + await CheckFaultStateAndResend().ConfigureAwait(false); + await Task.Delay(250).ConfigureAwait(false); + } + } + + private async Task RestartStreams() + { + _streamCts?.Cancel(); + _streamCts?.Dispose(); + _streamCts = new(); + if (!_grpcIsFaulty) + { + try + { + await _grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }).ConfigureAwait(false); + + RemoteCachedIdents.Clear(); + _ = StreamOnlineClientData(_streamCts.Token); + _ = ReceiveOnlineClientData(_streamCts.Token); + var remoteOnlineClients = await _grpcIdentClient.GetAllIdentsAsync(new ServerMessage() + { + ServerId = _shardName + }).ConfigureAwait(false); + foreach (var result in remoteOnlineClients.UidWithIdent) + { + RemoteCachedIdents[result.Uid.Uid] = result; + } + } + catch + { + SetGrpcFaulty(); + } + } + } + + private async Task StreamOnlineClientData(CancellationToken cts) + { + try + { + var stream = _grpcIdentClient.SendStreamIdentStatusChange(cancellationToken: cts); + await stream.RequestStream.WriteAsync(new IdentChangeMessage() + { + Server = new ServerMessage() + { + ServerId = _shardName + } + }).ConfigureAwait(false); + + while (!cts.IsCancellationRequested) + { + if (_identChangeQueue.TryDequeue(out var result)) + { + await stream.RequestStream.WriteAsync(new() { IdentChange = result }, cts).ConfigureAwait(false); + } + else + { + await Task.Delay(25, cts).ConfigureAwait(false); + } + } + } + catch (OperationCanceledException) { return; } + catch + { + SetGrpcFaulty(); + } + } + + private async Task ReceiveOnlineClientData(CancellationToken cts) + { + try + { + var stream = _grpcIdentClient.ReceiveStreamIdentStatusChange(new ServerMessage() + { + ServerId = _shardName, + }); + while (await stream.ResponseStream.MoveNext(cts).ConfigureAwait(false)) + { + var cur = stream.ResponseStream.Current; + if (cur.IsOnline) + { + RemoteCachedIdents[cur.UidWithIdent.Uid.Uid] = cur.UidWithIdent; + } + else if (RemoteCachedIdents.TryGetValue(cur.UidWithIdent.Uid.Uid, out var existingIdent) + && string.Equals(existingIdent.Ident.ServerId, cur.UidWithIdent.Ident.ServerId, StringComparison.Ordinal)) + { + RemoteCachedIdents.TryRemove(cur.UidWithIdent.Uid.Uid, out _); + } + } + } + catch (OperationCanceledException) { return; } + catch + { + SetGrpcFaulty(); + } + } + private async Task InvokeOnGrpc(AsyncUnaryCall toExecute) { try @@ -188,13 +275,11 @@ public class GrpcClientIdentificationService : IHostedService _grpcIsFaulty = false; _logger.LogInformation("GRPC connection is restored, sending current server idents"); - await _grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }).ConfigureAwait(false); + await RestartStreams().ConfigureAwait(false); var msg = new ServerIdentMessage(); msg.Idents.AddRange(OnlineClients.Select(c => new SetIdentMessage() { - Ident = c.Value, - Uid = c.Key, - ServerId = _shardName + UidWithIdent = c.Value })); await _grpcIdentClient.RecreateServerIdentsAsync(msg).ConfigureAwait(false); } diff --git a/MareSynchronosServer/MareSynchronosServices/Identity/IdentityHandler.cs b/MareSynchronosServer/MareSynchronosServices/Identity/IdentityHandler.cs index 0ee24e9..782caf9 100644 --- a/MareSynchronosServer/MareSynchronosServices/Identity/IdentityHandler.cs +++ b/MareSynchronosServer/MareSynchronosServices/Identity/IdentityHandler.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -45,6 +46,16 @@ internal class IdentityHandler return cachedIdentities.Count(c => c.Value.ServerId == serverId); } + internal Dictionary GetIdentsForAllExcept(string serverId) + { + return cachedIdentities.Where(k => k.Value.ServerId != serverId).ToDictionary(k => k.Key, k => k.Value); + } + + internal Dictionary GetIdentsForServer(string serverId) + { + return cachedIdentities.Where(k => k.Value.ServerId == serverId).ToDictionary(k => k.Key, k => k.Value); + } + internal void ClearIdentsForServer(string serverId) { var serverIdentities = cachedIdentities.Where(i => i.Value.ServerId == serverId); diff --git a/MareSynchronosServer/MareSynchronosServices/Identity/IdentityService.cs b/MareSynchronosServer/MareSynchronosServices/Identity/IdentityService.cs index 7e33118..13823ff 100644 --- a/MareSynchronosServer/MareSynchronosServices/Identity/IdentityService.cs +++ b/MareSynchronosServer/MareSynchronosServices/Identity/IdentityService.cs @@ -1,6 +1,9 @@ using Grpc.Core; using MareSynchronosShared.Protos; using Microsoft.Extensions.Logging; +using System.Collections.Concurrent; +using System.Linq; +using System.Runtime.ConstrainedExecution; using System.Threading.Tasks; namespace MareSynchronosServices.Identity; @@ -9,6 +12,7 @@ internal class IdentityService : IdentificationService.IdentificationServiceBase { private readonly ILogger _logger; private readonly IdentityHandler _handler; + private readonly ConcurrentDictionary> identChanges = new(); public IdentityService(ILogger logger, IdentityHandler handler) { @@ -16,18 +20,6 @@ internal class IdentityService : IdentificationService.IdentificationServiceBase _handler = handler; } - public override Task RemoveIdentForUid(RemoveIdentMessage request, ServerCallContext context) - { - _handler.RemoveIdent(request.Uid, request.ServerId); - return Task.FromResult(new Empty()); - } - - public override Task SetIdentForUid(SetIdentMessage request, ServerCallContext context) - { - _handler.SetIdent(request.Uid, request.ServerId, request.Ident); - return Task.FromResult(new Empty()); - } - public override async Task GetIdentForUid(UidMessage request, ServerCallContext context) { var result = await _handler.GetIdentForuid(request.Uid); @@ -38,15 +30,6 @@ internal class IdentityService : IdentificationService.IdentificationServiceBase }; } - public override async Task GetUidForCharacterIdent(CharacterIdentMessage request, ServerCallContext context) - { - var result = await _handler.GetUidForCharacterIdent(request.Ident, request.ServerId); - return new UidMessage() - { - Uid = result - }; - } - public override Task GetOnlineUserCount(ServerMessage request, ServerCallContext context) { return Task.FromResult(new OnlineUserCountResponse() { Count = _handler.GetOnlineUsers(request.ServerId) }); @@ -54,6 +37,23 @@ internal class IdentityService : IdentificationService.IdentificationServiceBase public override Task ClearIdentsForServer(ServerMessage request, ServerCallContext context) { + var idents = _handler.GetIdentsForServer(request.ServerId); + foreach (var entry in idents) + { + EnqueueIdentOffline(new UidWithIdent() + { + Ident = new CharacterIdentMessage() + { + Ident = entry.Value.CharacterIdent, + ServerId = entry.Value.ServerId + }, + Uid = new UidMessage() + { + Uid = entry.Key + } + }); + } + _handler.ClearIdentsForServer(request.ServerId); return Task.FromResult(new Empty()); } @@ -62,27 +62,113 @@ internal class IdentityService : IdentificationService.IdentificationServiceBase { foreach (var identMsg in request.Idents) { - _handler.SetIdent(identMsg.Uid, identMsg.ServerId, identMsg.Ident); + _handler.SetIdent(identMsg.UidWithIdent.Uid.Uid, identMsg.UidWithIdent.Ident.ServerId, identMsg.UidWithIdent.Ident.Ident); + EnqueueIdentOnline(identMsg.UidWithIdent); } return Task.FromResult(new Empty()); } - public override async Task ValidateCachedIdents(MultiUidMessage request, ServerCallContext context) + public override async Task SendStreamIdentStatusChange(IAsyncStreamReader requestStream, ServerCallContext context) { - UidWithIdentMessage response = new UidWithIdentMessage(); - foreach (var msg in request.Uids) + await requestStream.MoveNext(); + var server = requestStream.Current.Server; + if (server == null) throw new System.Exception("First message needs to be server message"); + _logger.LogInformation("Registered Server " + server.ServerId + " input stream"); + identChanges[server.ServerId] = new ConcurrentQueue(); + while (await requestStream.MoveNext().ConfigureAwait(false)) { - UidWithIdent msgResp = new() + var cur = requestStream.Current.IdentChange; + if (cur == null) throw new System.Exception("Expected client ident change"); + EnqueueIdentChange(cur); + + if (cur.IsOnline) { - Uid = msg, - Ident = new() - }; - var ident = await _handler.GetIdentForuid(msg.Uid); - msgResp.Ident.Ident = ident.CharacterIdent; - msgResp.Ident.ServerId = ident.ServerId; - response.UidWithIdent.Add(msgResp); + _handler.SetIdent(cur.UidWithIdent.Uid.Uid, cur.UidWithIdent.Ident.ServerId, cur.UidWithIdent.Ident.Ident); + } + else + { + _handler.RemoveIdent(cur.UidWithIdent.Uid.Uid, cur.UidWithIdent.Ident.ServerId); + } } - return response; + _logger.LogInformation("Server input stream from " + server + " finished"); + + return new Empty(); + } + + public override async Task ReceiveStreamIdentStatusChange(ServerMessage request, IServerStreamWriter responseStream, ServerCallContext context) + { + var server = request.ServerId; + _logger.LogInformation("Registered Server " + server + " output stream"); + + try + { + while (true) + { + if (identChanges.ContainsKey(server) && identChanges[server].TryDequeue(out var cur)) + { + _logger.LogInformation("Sending " + cur.UidWithIdent.Uid.Uid + " to " + server); + await responseStream.WriteAsync(cur).ConfigureAwait(false); + } + else + { + await Task.Delay(10).ConfigureAwait(false); + } + } + } + catch + { + _logger.LogInformation("Server output stream to " + server + " finished or faulty"); + } + } + + public override Task GetAllIdents(ServerMessage request, ServerCallContext context) + { + var response = new UidWithIdentMessage(); + foreach (var item in _handler.GetIdentsForAllExcept(request.ServerId)) + { + response.UidWithIdent.Add(new UidWithIdent() + { + Uid = new UidMessage() + { + Uid = item.Key + }, + Ident = new CharacterIdentMessage() + { + Ident = item.Value.CharacterIdent, + ServerId = item.Value.ServerId + } + }); + } + + return Task.FromResult(response); + } + + private void EnqueueIdentChange(IdentChange identchange) + { + _logger.LogInformation("Enqueued " + identchange.UidWithIdent.Uid.Uid + " from " + identchange.UidWithIdent.Ident.ServerId); + + foreach (var dict in identChanges.Where(k => k.Key != identchange.UidWithIdent.Ident.ServerId)) + { + dict.Value.Enqueue(identchange); + } + } + + private void EnqueueIdentOnline(UidWithIdent ident) + { + EnqueueIdentChange(new IdentChange() + { + IsOnline = true, + UidWithIdent = ident + }); + } + + private void EnqueueIdentOffline(UidWithIdent ident) + { + EnqueueIdentChange(new IdentChange() + { + IsOnline = false, + UidWithIdent = ident + }); } } \ No newline at end of file diff --git a/MareSynchronosServer/MareSynchronosShared/Protos/mareservices.proto b/MareSynchronosServer/MareSynchronosShared/Protos/mareservices.proto index 338eaac..913a5e3 100644 --- a/MareSynchronosServer/MareSynchronosShared/Protos/mareservices.proto +++ b/MareSynchronosServer/MareSynchronosShared/Protos/mareservices.proto @@ -18,13 +18,12 @@ service FileService { service IdentificationService { rpc GetOnlineUserCount (ServerMessage) returns (OnlineUserCountResponse); - rpc RemoveIdentForUid (RemoveIdentMessage) returns (Empty); - rpc SetIdentForUid (SetIdentMessage) returns (Empty); - rpc GetUidForCharacterIdent (CharacterIdentMessage) returns (UidMessage); rpc GetIdentForUid (UidMessage) returns (CharacterIdentMessage); rpc ClearIdentsForServer (ServerMessage) returns (Empty); rpc RecreateServerIdents (ServerIdentMessage) returns (Empty); - rpc ValidateCachedIdents (MultiUidMessage) returns (UidWithIdentMessage); + rpc GetAllIdents (ServerMessage) returns (UidWithIdentMessage); + rpc SendStreamIdentStatusChange (stream IdentChangeMessage) returns (Empty); + rpc ReceiveStreamIdentStatusChange (ServerMessage) returns (stream IdentChange); } message Empty { } @@ -33,6 +32,22 @@ message MultiUidMessage { repeated UidMessage uids = 1; } +message ServerIdentMessage { + repeated SetIdentMessage idents = 1; +} + +message IdentChangeMessage { + oneof payload { + ServerMessage server = 1; + IdentChange identChange = 2; + } +} + +message IdentChange { + UidWithIdent uidWithIdent = 1; + bool isOnline = 2; +} + message UidWithIdentMessage { repeated UidWithIdent uidWithIdent = 1; } @@ -42,10 +57,6 @@ message UidWithIdent { CharacterIdentMessage ident = 2; } -message ServerIdentMessage { - repeated SetIdentMessage idents = 1; -} - message UidMessage { string uid = 1; } @@ -64,9 +75,7 @@ message RemoveIdentMessage { } message SetIdentMessage { - string uid = 1; - string server_id = 2; - string ident = 3; + UidWithIdent uidWithIdent = 1; } message CharacterIdentMessage {