attempt to switch ident service to redis

This commit is contained in:
rootdarkarchon
2023-01-08 14:51:56 +01:00
parent 61111d387a
commit a182f36485
23 changed files with 118 additions and 683 deletions

View File

@@ -1,6 +1,5 @@
using MareSynchronos.API;
using MareSynchronosServer.Authentication;
using MareSynchronosServer.Services;
using MareSynchronosShared;
using MareSynchronosShared.Data;
using MareSynchronosShared.Services;
@@ -9,6 +8,7 @@ using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.IdentityModel.Tokens;
using StackExchange.Redis;
using System.IdentityModel.Tokens.Jwt;
using System.Security.Claims;
using System.Text;
@@ -20,21 +20,21 @@ namespace MareSynchronosServer.Controllers;
public class JwtController : Controller
{
private readonly IHttpContextAccessor _accessor;
private readonly IDatabase _redis;
private readonly MareDbContext _mareDbContext;
private readonly SecretKeyAuthenticatorService _secretKeyAuthenticatorService;
private readonly IConfigurationService<MareConfigurationAuthBase> _configuration;
private readonly IClientIdentificationService _clientIdentService;
public JwtController(IHttpContextAccessor accessor, MareDbContext mareDbContext,
SecretKeyAuthenticatorService secretKeyAuthenticatorService,
IConfigurationService<MareConfigurationAuthBase> configuration,
IClientIdentificationService clientIdentService)
IConnectionMultiplexer connectionMultiplexer)
{
_accessor = accessor;
_redis = connectionMultiplexer.GetDatabase();
_mareDbContext = mareDbContext;
_secretKeyAuthenticatorService = secretKeyAuthenticatorService;
_configuration = configuration;
_clientIdentService = clientIdentService;
}
[AllowAnonymous]
@@ -54,7 +54,7 @@ public class JwtController : Controller
if (!authResult.Success && !authResult.TempBan) return Unauthorized("The provided secret key is invalid. Verify your accounts existence and/or recover the secret key.");
if (!authResult.Success && authResult.TempBan) return Unauthorized("You are temporarily banned. Try connecting again later.");
var existingIdent = _clientIdentService.GetCharacterIdentForUid(authResult.Uid);
var existingIdent = await _redis.StringGetAsync("UID:" + authResult.Uid);
if (!string.IsNullOrEmpty(existingIdent)) return Unauthorized("Already logged in to this account.");
var token = CreateToken(new List<Claim>()

View File

@@ -82,12 +82,12 @@ public partial class MareHub
public async Task<List<OnlineUserDto>> AdminGetOnlineUsers()
{
var users = await _dbContext.Users.AsNoTracking().ToListAsync().ConfigureAwait(false);
return users.Where(c => !string.IsNullOrEmpty(_clientIdentService.GetCharacterIdentForUid(c.UID))).Select(b => new OnlineUserDto
return users.Select(user => new { user, GetIdentFromUidFromRedis(user.UID).Result }).Where(a => !string.IsNullOrEmpty(a.Result)).Select(b => new OnlineUserDto
{
CharacterNameHash = _clientIdentService.GetCharacterIdentForUid(b.UID),
UID = b.UID,
IsModerator = b.IsModerator,
IsAdmin = b.IsAdmin
CharacterNameHash = b.Result,
UID = b.user.UID,
IsModerator = b.user.IsModerator,
IsAdmin = b.user.IsAdmin
}).ToList();
}

View File

@@ -7,6 +7,27 @@ namespace MareSynchronosServer.Hubs;
public partial class MareHub
{
private async Task UpdateUserOnRedis()
{
await _redis.StringSetAsync("UID:" + UserUID, UserCharaIdent, TimeSpan.FromSeconds(60), flags: StackExchange.Redis.CommandFlags.FireAndForget).ConfigureAwait(false);
await _redis.StringSetAsync("IDENT:" + UserCharaIdent, UserUID, TimeSpan.FromSeconds(60), flags: StackExchange.Redis.CommandFlags.FireAndForget).ConfigureAwait(false);
}
private async Task RemoveUserFromRedis()
{
await _redis.StringGetDeleteAsync("UID:" + UserUID).ConfigureAwait(false);
await _redis.StringGetDeleteAsync("IDENT:" + UserCharaIdent).ConfigureAwait(false);
}
public async Task<string> GetIdentFromUidFromRedis(string uid)
{
return await _redis.StringGetAsync("UID:" + uid).ConfigureAwait(false);
}
public async Task<string> GetUidFromIdentFromRedis(string ident)
{
return await _redis.StringGetAsync("IDENT:" + ident).ConfigureAwait(false);
}
private async Task<List<PausedEntry>> GetAllPairedClientsWithPauseState(string? uid = null)
{
uid ??= UserUID;
@@ -81,7 +102,7 @@ public partial class MareHub
if (userPair.IsPausedPerGroup is PauseInfo.Unpaused) return;
}
var groupUserIdent = _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID);
var groupUserIdent = await GetIdentFromUidFromRedis(groupUserPair.GroupUserUID).ConfigureAwait(false);
if (!string.IsNullOrEmpty(groupUserIdent))
{
await Clients.User(uid).Client_UserChangePairedPlayer(groupUserIdent, false).ConfigureAwait(false);
@@ -93,7 +114,7 @@ public partial class MareHub
{
foreach (var pair in groupUsers)
{
var pairIdent = _clientIdentService.GetCharacterIdentForUid(pair.GroupUserUID);
var pairIdent = await GetIdentFromUidFromRedis(pair.GroupUserUID).ConfigureAwait(false);
if (string.IsNullOrEmpty(pairIdent)) continue;
var pairs = await GetAllPairedClientsWithPauseState(pair.GroupUserUID).ConfigureAwait(false);

View File

@@ -232,7 +232,7 @@ public partial class MareHub
if (userPair.IsPausedExcludingGroup(gid) is PauseInfo.Unpaused) continue;
if (userPair.IsPausedPerGroup is PauseInfo.Paused) continue;
var groupUserIdent = _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID);
var groupUserIdent = await GetIdentFromUidFromRedis(groupUserPair.GroupUserUID).ConfigureAwait(false);
if (!string.IsNullOrEmpty(groupUserIdent))
{
await Clients.User(UserUID).Client_UserChangePairedPlayer(groupUserIdent, true).ConfigureAwait(false);
@@ -403,7 +403,7 @@ public partial class MareHub
if (userPair.IsOtherPausedForSpecificGroup(gid) is PauseInfo.Paused) continue;
}
var groupUserIdent = _clientIdentService.GetCharacterIdentForUid(groupUserPair.GroupUserUID);
var groupUserIdent = await GetIdentFromUidFromRedis(groupUserPair.GroupUserUID).ConfigureAwait(false);
if (!string.IsNullOrEmpty(groupUserIdent))
{
await Clients.User(UserUID).Client_UserChangePairedPlayer(groupUserIdent, !isPaused).ConfigureAwait(false);
@@ -437,7 +437,7 @@ public partial class MareHub
UserUID = uid,
}).ConfigureAwait(false);
var userIdent = _clientIdentService.GetCharacterIdentForUid(uid);
var userIdent = await GetIdentFromUidFromRedis(uid).ConfigureAwait(false);
if (userIdent == null) return;
await Clients.User(uid).Client_GroupChange(new GroupDto()
@@ -683,7 +683,7 @@ public partial class MareHub
UserUID = pair.GroupUserUID
}).ConfigureAwait(false);
var pairIdent = _clientIdentService.GetCharacterIdentForUid(pair.GroupUserUID);
var pairIdent = await GetIdentFromUidFromRedis(pair.GroupUserUID).ConfigureAwait(false);
if (string.IsNullOrEmpty(pairIdent)) continue;
var allUserPairs = await GetAllPairedClientsWithPauseState(pair.GroupUserUID).ConfigureAwait(false);

View File

@@ -64,7 +64,7 @@ public partial class MareHub
_logger.LogCallInfo();
var usersToSendOnlineTo = await SendOnlineToAllPairedUsers(UserCharaIdent).ConfigureAwait(false);
return usersToSendOnlineTo.Select(e => _clientIdentService.GetCharacterIdentForUid(e)).Where(t => !string.IsNullOrEmpty(t)).Distinct(StringComparer.Ordinal).ToList();
return usersToSendOnlineTo.Select(e => GetIdentFromUidFromRedis(e).Result).Where(t => !string.IsNullOrEmpty(t)).Distinct(StringComparer.Ordinal).ToList();
}
[Authorize(Policy = "Identified")]
@@ -147,7 +147,7 @@ public partial class MareHub
var allPairedUsers = await GetAllPairedUnpausedUsers().ConfigureAwait(false);
var allPairedUsersDict = allPairedUsers.ToDictionary(f => f, f => _clientIdentService.GetCharacterIdentForUid(f), StringComparer.Ordinal)
var allPairedUsersDict = allPairedUsers.ToDictionary(f => f, f => GetIdentFromUidFromRedis(f).Result, StringComparer.Ordinal)
.Where(f => visibleCharacterIds.Contains(f.Value, StringComparer.Ordinal));
_logger.LogCallInfo(MareHubLogger.Args(visibleCharacterIds.Count, allPairedUsersDict.Count()));
@@ -209,7 +209,7 @@ public partial class MareHub
if (otherEntry == null) return;
// check if other user is online
var otherIdent = _clientIdentService.GetCharacterIdentForUid(otherUser.UID);
var otherIdent = await GetIdentFromUidFromRedis(otherUser.UID).ConfigureAwait(false);
if (otherIdent == null) return;
// send push with update to other user if other user is online
@@ -224,7 +224,7 @@ public partial class MareHub
}).ConfigureAwait(false);
// get own ident and all pairs
var userIdent = _clientIdentService.GetCharacterIdentForUid(user.UID);
var userIdent = await GetIdentFromUidFromRedis(user.UID).ConfigureAwait(false);
var allUserPairs = await GetAllPairedClientsWithPauseState().ConfigureAwait(false);
// if the other user has paused the main user and there was no previous group connection don't send anything
@@ -270,7 +270,7 @@ public partial class MareHub
IsSynced = true
}).ConfigureAwait(false);
var otherCharaIdent = _clientIdentService.GetCharacterIdentForUid(pair.OtherUserUID);
var otherCharaIdent = await GetIdentFromUidFromRedis(pair.OtherUserUID).ConfigureAwait(false);
if (UserCharaIdent == null || otherCharaIdent == null || otherEntry.IsPaused) return;
@@ -310,7 +310,7 @@ public partial class MareHub
if (oppositeClientPair == null) return;
// check if other user is online, if no then there is no need to do anything further
var otherIdent = _clientIdentService.GetCharacterIdentForUid(otherUserUid);
var otherIdent = await GetIdentFromUidFromRedis(otherUserUid).ConfigureAwait(false);
if (otherIdent == null) return;
// get own ident and

View File

@@ -9,6 +9,7 @@ using MareSynchronosShared.Services;
using MareSynchronosShared.Utils;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
using StackExchange.Redis;
namespace MareSynchronosServer.Hubs;
@@ -19,7 +20,6 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
private readonly FileService.FileServiceClient _fileServiceClient;
private readonly SystemInfoService _systemInfoService;
private readonly IHttpContextAccessor _contextAccessor;
private readonly IClientIdentificationService _clientIdentService;
private readonly MareHubLogger _logger;
private readonly MareDbContext _dbContext;
private readonly Uri _mainCdnFullUrl;
@@ -28,11 +28,12 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
private readonly int _maxJoinedGroupsByUser;
private readonly int _maxGroupUserCount;
private readonly IConfigurationService<ServerConfiguration> _configurationService;
private readonly IDatabase _redis;
public MareHub(MareMetrics mareMetrics, FileService.FileServiceClient fileServiceClient,
MareDbContext mareDbContext, ILogger<MareHub> logger, SystemInfoService systemInfoService,
IConfigurationService<ServerConfiguration> configuration, IHttpContextAccessor contextAccessor,
IClientIdentificationService clientIdentService)
IConnectionMultiplexer connectionMultiplexer)
{
_mareMetrics = mareMetrics;
_fileServiceClient = fileServiceClient;
@@ -44,7 +45,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
_maxJoinedGroupsByUser = configuration.GetValueOrDefault(nameof(ServerConfiguration.MaxJoinedGroupsByUser), 6);
_maxGroupUserCount = configuration.GetValueOrDefault(nameof(ServerConfiguration.MaxGroupUserCount), 100);
_contextAccessor = contextAccessor;
_clientIdentService = clientIdentService;
_redis = connectionMultiplexer.GetDatabase();
_logger = new MareHubLogger(this, logger);
_dbContext = mareDbContext;
}
@@ -92,14 +93,9 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
[Authorize(Policy = "Authenticated")]
public async Task<bool> CheckClientHealth()
{
var needsReconnect = !_clientIdentService.IsOnCurrentServer(UserUID);
if (needsReconnect)
{
await Clients.Caller.Client_ReceiveServerMessage(MessageSeverity.Warning, "Internal server state corruption detected, reconnecting you automatically to fix the issue.").ConfigureAwait(false);
_logger.LogCallWarning(MareHubLogger.Args(needsReconnect));
}
await UpdateUserOnRedis().ConfigureAwait(false);
return needsReconnect;
return false;
}
[Authorize(Policy = "Authenticated")]
@@ -111,7 +107,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
{
_logger.LogCallInfo(MareHubLogger.Args(_contextAccessor.GetIpAddress(), UserCharaIdent));
_clientIdentService.MarkUserOnline(UserUID, UserCharaIdent);
await UpdateUserOnRedis().ConfigureAwait(false);
}
catch { }
@@ -127,7 +123,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
{
_logger.LogCallInfo(MareHubLogger.Args(_contextAccessor.GetIpAddress(), UserCharaIdent));
_clientIdentService.MarkUserOffline(UserUID);
await RemoveUserFromRedis().ConfigureAwait(false);
await SendOfflineToAllPairedUsers(UserCharaIdent).ConfigureAwait(false);

View File

@@ -1,98 +0,0 @@
using MareSynchronosShared.Protos;
using System.Collections.Concurrent;
namespace MareSynchronosServer.Identity;
public class IdentityHandler
{
private readonly ConcurrentDictionary<string, ServerIdentity> _cachedIdentities = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, ConcurrentQueue<IdentChange>> _identChanges = new(StringComparer.Ordinal);
private readonly ILogger<IdentityHandler> _logger;
public IdentityHandler(ILogger<IdentityHandler> logger)
{
_logger = logger;
}
internal Task<ServerIdentity> GetIdentForUid(string uid)
{
if (!_cachedIdentities.TryGetValue(uid, out ServerIdentity result))
{
result = new ServerIdentity();
}
return Task.FromResult(result);
}
internal void SetIdent(string uid, string serverId, string ident)
{
_cachedIdentities[uid] = new ServerIdentity() { ServerId = serverId, CharacterIdent = ident };
}
internal void RemoveIdent(string uid, string serverId)
{
if (_cachedIdentities.ContainsKey(uid) && string.Equals(_cachedIdentities[uid].ServerId, serverId, StringComparison.Ordinal))
{
_cachedIdentities.TryRemove(uid, out _);
}
}
internal int GetOnlineUsers(string serverId)
{
if (string.IsNullOrEmpty(serverId))
return _cachedIdentities.Count;
return _cachedIdentities.Count(c => string.Equals(c.Value.ServerId, serverId, StringComparison.Ordinal));
}
internal Dictionary<string, ServerIdentity> GetIdentsForAllExcept(string serverId)
{
return _cachedIdentities.Where(k => !string.Equals(k.Value.ServerId, serverId, StringComparison.Ordinal)).ToDictionary(k => k.Key, k => k.Value, StringComparer.Ordinal);
}
internal Dictionary<string, ServerIdentity> GetIdentsForServer(string serverId)
{
return _cachedIdentities.Where(k => string.Equals(k.Value.ServerId, serverId, StringComparison.Ordinal)).ToDictionary(k => k.Key, k => k.Value, StringComparer.Ordinal);
}
internal void ClearIdentsForServer(string serverId)
{
var serverIdentities = _cachedIdentities.Where(i => string.Equals(i.Value.ServerId, serverId, StringComparison.Ordinal));
foreach (var identity in serverIdentities)
{
_cachedIdentities.TryRemove(identity.Key, out _);
}
}
internal void EnqueueIdentChange(IdentChange identchange)
{
_logger.LogDebug("Enqueued " + identchange.UidWithIdent.Uid.Uid + ":" + identchange.IsOnline + " from " + identchange.UidWithIdent.Ident.ServerId);
foreach (var k in _identChanges.Keys)
{
if (string.Equals(k, identchange.UidWithIdent.Ident.ServerId, StringComparison.Ordinal)) continue;
_identChanges[k].Enqueue(identchange);
}
}
internal bool DequeueIdentChange(string server, out IdentChange cur)
{
if (!(_identChanges.ContainsKey(server) && _identChanges[server].TryDequeue(out cur)))
{
cur = null;
return false;
}
return true;
}
internal void RegisterServerForQueue(string serverId)
{
_identChanges[serverId] = new ConcurrentQueue<IdentChange>();
}
internal record ServerIdentity
{
public string ServerId { get; set; } = string.Empty;
public string CharacterIdent { get; set; } = string.Empty;
}
}

View File

@@ -38,6 +38,7 @@
<PackageReference Include="Microsoft.Extensions.Hosting.Systemd" Version="7.0.0" />
<PackageReference Include="Microsoft.IdentityModel.Tokens" Version="6.25.1" />
<PackageReference Include="prometheus-net.AspNetCore" Version="7.0.0" />
<PackageReference Include="StackExchange.Redis" Version="2.6.86" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="6.25.1" />
</ItemGroup>
@@ -46,4 +47,8 @@
<ProjectReference Include="..\MareSynchronosShared\MareSynchronosShared.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Identity\" />
</ItemGroup>
</Project>

View File

@@ -2,22 +2,22 @@
using Microsoft.AspNetCore.SignalR;
using MareSynchronosShared.Data;
using Microsoft.EntityFrameworkCore;
using MareSynchronosServer.Services;
using MareSynchronosShared.Utils;
using StackExchange.Redis;
namespace MareSynchronosServer.RequirementHandlers;
public class UserRequirementHandler : AuthorizationHandler<UserRequirement, HubInvocationContext>
{
private readonly IClientIdentificationService identClient;
private readonly MareDbContext dbContext;
private readonly ILogger<UserRequirementHandler> logger;
private readonly MareDbContext _dbContext;
private readonly ILogger<UserRequirementHandler> _logger;
private readonly IDatabase _redis;
public UserRequirementHandler(IClientIdentificationService identClient, MareDbContext dbContext, ILogger<UserRequirementHandler> logger)
public UserRequirementHandler(MareDbContext dbContext, ILogger<UserRequirementHandler> logger, IConnectionMultiplexer connectionMultiplexer)
{
this.identClient = identClient;
this.dbContext = dbContext;
this.logger = logger;
_dbContext = dbContext;
_logger = logger;
_redis = connectionMultiplexer.GetDatabase();
}
protected override async Task HandleRequirementAsync(AuthorizationHandlerContext context, UserRequirement requirement, HubInvocationContext resource)
@@ -28,25 +28,22 @@ public class UserRequirementHandler : AuthorizationHandler<UserRequirement, HubI
if ((requirement.Requirements & UserRequirements.Identified) is UserRequirements.Identified)
{
var ident = identClient.GetCharacterIdentForUid(uid);
if (ident == null) context.Fail();
var isOnCurrent = identClient.IsOnCurrentServer(uid);
if (!isOnCurrent) identClient.MarkUserOnline(uid, ident);
var ident = await _redis.StringGetAsync("UID:" + uid).ConfigureAwait(false);
if (ident == RedisValue.EmptyString) context.Fail();
}
if ((requirement.Requirements & UserRequirements.Administrator) is UserRequirements.Administrator)
{
var user = await dbContext.Users.AsNoTracking().SingleOrDefaultAsync(b => b.UID == uid).ConfigureAwait(false);
var user = await _dbContext.Users.AsNoTracking().SingleOrDefaultAsync(b => b.UID == uid).ConfigureAwait(false);
if (user == null || !user.IsAdmin) context.Fail();
logger.LogInformation("Admin {uid} authenticated", uid);
_logger.LogInformation("Admin {uid} authenticated", uid);
}
if ((requirement.Requirements & UserRequirements.Moderator) is UserRequirements.Moderator)
{
var user = await dbContext.Users.AsNoTracking().SingleOrDefaultAsync(b => b.UID == uid).ConfigureAwait(false);
var user = await _dbContext.Users.AsNoTracking().SingleOrDefaultAsync(b => b.UID == uid).ConfigureAwait(false);
if (user == null || !user.IsAdmin && !user.IsModerator) context.Fail();
logger.LogInformation("Admin/Moderator {uid} authenticated", uid);
_logger.LogInformation("Admin/Moderator {uid} authenticated", uid);
}
context.Succeed(requirement);

View File

@@ -1,241 +0,0 @@
using Grpc.Core;
using MareSynchronosShared.Metrics;
using MareSynchronosShared.Protos;
using MareSynchronosShared.Services;
using MareSynchronosShared.Utils;
using System.Collections.Concurrent;
namespace MareSynchronosServer.Services;
public class GrpcClientIdentificationService : GrpcBaseService, IClientIdentificationService
{
private readonly string _shardName;
private readonly ILogger<GrpcClientIdentificationService> _logger;
private readonly IdentificationService.IdentificationServiceClient _grpcIdentClient;
private readonly IdentificationService.IdentificationServiceClient _grpcIdentClientStreamOut;
private readonly IdentificationService.IdentificationServiceClient _grpcIdentClientStreamIn;
private readonly MareMetrics _metrics;
protected readonly ConcurrentDictionary<string, UidWithIdent> OnlineClients = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, UidWithIdent> RemoteCachedIdents = new(StringComparer.Ordinal);
private readonly ConcurrentQueue<IdentChange> _identChangeQueue = new();
public GrpcClientIdentificationService(ILogger<GrpcClientIdentificationService> logger,
IdentificationService.IdentificationServiceClient gprcIdentClient,
IdentificationService.IdentificationServiceClient gprcIdentClientStreamOut,
IdentificationService.IdentificationServiceClient gprcIdentClientStreamIn,
MareMetrics metrics, IConfigurationService<ServerConfiguration> configuration) : base(logger)
{
_shardName = configuration.GetValueOrDefault(nameof(ServerConfiguration.ShardName), string.Empty);
_logger = logger;
_grpcIdentClient = gprcIdentClient;
_grpcIdentClientStreamOut = gprcIdentClientStreamOut;
_grpcIdentClientStreamIn = gprcIdentClientStreamIn;
_metrics = metrics;
}
public bool IsOnCurrentServer(string uid)
{
return OnlineClients.ContainsKey(uid);
}
public string? GetCharacterIdentForUid(string uid)
{
if (OnlineClients.TryGetValue(uid, out var ident))
{
return ident.Ident.Ident;
}
if (RemoteCachedIdents.TryGetValue(uid, out var cachedIdent))
{
return cachedIdent.Ident.Ident;
}
return null;
}
public string? GetServerForUid(string uid)
{
if (OnlineClients.ContainsKey(uid))
{
return _shardName;
}
if (RemoteCachedIdents.TryGetValue(uid, out var cachedIdent))
{
return cachedIdent.Ident.ServerId;
}
return null;
}
public async Task<long> GetOnlineUsers()
{
var result = await InvokeOnGrpc(_grpcIdentClient.GetOnlineUserCountAsync(new ServerMessage())).ConfigureAwait(false);
if (result == default(OnlineUserCountResponse)) return OnlineClients.Count;
return result.Count;
}
public string? GetUidForCharacterIdent(string characterIdent)
{
bool existsLocal = OnlineClients.Any(o => string.Equals(o.Value.Ident.Ident, characterIdent, StringComparison.Ordinal));
if (existsLocal)
{
return OnlineClients.First(c => string.Equals(c.Value.Ident.Ident, characterIdent, StringComparison.Ordinal)).Key;
}
bool existsCached = RemoteCachedIdents.Any(o => string.Equals(o.Value.Ident.Ident, characterIdent, StringComparison.Ordinal));
if (existsCached)
{
return RemoteCachedIdents.First(c => string.Equals(c.Value.Ident.Ident, characterIdent, StringComparison.Ordinal)).Key;
}
return null;
}
public void MarkUserOffline(string uid)
{
_metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count);
if (OnlineClients.TryRemove(uid, out var uidWithIdent))
{
_identChangeQueue.Enqueue(new IdentChange()
{
IsOnline = false,
UidWithIdent = uidWithIdent
});
}
}
public void MarkUserOnline(string uid, string charaIdent)
{
OnlineClients[uid] = new UidWithIdent()
{
Uid = new()
{
Uid = uid,
},
Ident = new()
{
Ident = charaIdent,
ServerId = _shardName
}
};
_metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count);
_identChangeQueue.Enqueue(new IdentChange()
{
IsOnline = true,
UidWithIdent = OnlineClients[uid]
});
}
private async Task StreamOnlineClientData(CancellationToken cts)
{
try
{
using var stream = _grpcIdentClientStreamOut.SendStreamIdentStatusChange(cancellationToken: cts);
_logger.LogInformation("Starting Send Online Client Data stream");
await stream.RequestStream.WriteAsync(new IdentChangeMessage()
{
Server = new ServerMessage()
{
ServerId = _shardName
}
}, cts).ConfigureAwait(false);
while (!cts.IsCancellationRequested)
{
if (_identChangeQueue.TryDequeue(out var result))
{
await stream.RequestStream.WriteAsync(new() { IdentChange = result }, cts).ConfigureAwait(false);
}
else
{
await Task.Delay(10, cts).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException) { return; }
catch
{
SetGrpcFaulty();
}
}
private async Task ReceiveOnlineClientData(CancellationToken cts)
{
try
{
using var stream = _grpcIdentClientStreamIn.ReceiveStreamIdentStatusChange(new ServerMessage()
{
ServerId = _shardName,
}, cancellationToken: cts);
_logger.LogInformation("Starting Receive Online Client Data stream");
await foreach (var cur in stream.ResponseStream.ReadAllAsync(cts).ConfigureAwait(false))
{
OnlineClients.Remove(cur.UidWithIdent.Uid.Uid, out _);
if (cur.IsOnline)
{
RemoteCachedIdents[cur.UidWithIdent.Uid.Uid] = cur.UidWithIdent;
}
else if (RemoteCachedIdents.TryGetValue(cur.UidWithIdent.Uid.Uid, out var existingIdent)
&& string.Equals(existingIdent.Ident.ServerId, cur.UidWithIdent.Ident.ServerId, StringComparison.Ordinal))
{
RemoteCachedIdents.TryRemove(cur.UidWithIdent.Uid.Uid, out _);
}
}
_logger.LogCritical("Receive Online Client Data Stream ended");
}
catch (OperationCanceledException) { return; }
catch
{
SetGrpcFaulty();
}
}
protected override Task StartAsyncInternal(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
protected override async Task StopAsyncInternal(CancellationToken cancellationToken)
{
await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }, cancellationToken: cancellationToken)).ConfigureAwait(false);
}
protected override async Task OnGrpcRestore()
{
var msg = new ServerIdentMessage();
msg.Idents.AddRange(OnlineClients.Select(c => new SetIdentMessage()
{
UidWithIdent = c.Value
}));
await _grpcIdentClient.RecreateServerIdentsAsync(msg).ConfigureAwait(false);
}
protected override async Task PreStartStream()
{
await _grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }).ConfigureAwait(false);
RemoteCachedIdents.Clear();
}
protected override Task StartStream(CancellationToken ct)
{
_ = StreamOnlineClientData(ct);
_ = ReceiveOnlineClientData(ct);
return Task.CompletedTask;
}
protected override async Task PostStartStream()
{
var remoteOnlineClients = await _grpcIdentClient.GetAllIdentsAsync(new ServerMessage()
{
ServerId = _shardName
}).ConfigureAwait(false);
foreach (var result in remoteOnlineClients.UidWithIdent)
{
RemoteCachedIdents[result.Uid.Uid] = result;
}
}
}

View File

@@ -1,11 +0,0 @@
namespace MareSynchronosServer.Services;
public interface IClientIdentificationService : IHostedService
{
string GetCharacterIdentForUid(string uid);
Task<long> GetOnlineUsers();
string GetServerForUid(string uid);
bool IsOnCurrentServer(string uid);
void MarkUserOffline(string uid);
void MarkUserOnline(string uid, string charaIdent);
}

View File

@@ -1,161 +0,0 @@
using Grpc.Core;
using MareSynchronosServer.Identity;
using MareSynchronosShared.Protos;
using Microsoft.AspNetCore.Authorization;
namespace MareSynchronosServer.Services;
internal class GrpcIdentityService : IdentificationService.IdentificationServiceBase
{
private readonly ILogger<GrpcIdentityService> _logger;
private readonly IdentityHandler _handler;
public GrpcIdentityService(ILogger<GrpcIdentityService> logger, IdentityHandler handler)
{
_logger = logger;
_handler = handler;
}
public override async Task<CharacterIdentMessage> GetIdentForUid(UidMessage request, ServerCallContext context)
{
var result = await _handler.GetIdentForUid(request.Uid).ConfigureAwait(false);
return new CharacterIdentMessage()
{
Ident = result.CharacterIdent,
ServerId = result.ServerId
};
}
[AllowAnonymous]
public override Task<OnlineUserCountResponse> GetOnlineUserCount(ServerMessage request, ServerCallContext context)
{
return Task.FromResult(new OnlineUserCountResponse() { Count = _handler.GetOnlineUsers(request.ServerId) });
}
public override Task<Empty> ClearIdentsForServer(ServerMessage request, ServerCallContext context)
{
var idents = _handler.GetIdentsForServer(request.ServerId);
foreach (var entry in idents)
{
EnqueueIdentOffline(new UidWithIdent()
{
Ident = new CharacterIdentMessage()
{
Ident = entry.Value.CharacterIdent,
ServerId = entry.Value.ServerId
},
Uid = new UidMessage()
{
Uid = entry.Key
}
});
}
_handler.ClearIdentsForServer(request.ServerId);
return Task.FromResult(new Empty());
}
public override Task<Empty> RecreateServerIdents(ServerIdentMessage request, ServerCallContext context)
{
foreach (var identMsg in request.Idents)
{
_handler.SetIdent(identMsg.UidWithIdent.Uid.Uid, identMsg.UidWithIdent.Ident.ServerId, identMsg.UidWithIdent.Ident.Ident);
EnqueueIdentOnline(identMsg.UidWithIdent);
}
return Task.FromResult(new Empty());
}
public override async Task<Empty> SendStreamIdentStatusChange(IAsyncStreamReader<IdentChangeMessage> requestStream, ServerCallContext context)
{
await requestStream.MoveNext().ConfigureAwait(false);
var server = requestStream.Current.Server;
if (server == null) throw new System.Exception("First message needs to be server message");
_handler.RegisterServerForQueue(server.ServerId);
_logger.LogInformation("Registered Server " + server.ServerId + " input stream");
while (await requestStream.MoveNext(context.CancellationToken).ConfigureAwait(false))
{
var cur = requestStream.Current.IdentChange;
if (cur == null) throw new System.Exception("Expected client ident change");
_handler.EnqueueIdentChange(cur);
if (cur.IsOnline)
{
_handler.SetIdent(cur.UidWithIdent.Uid.Uid, cur.UidWithIdent.Ident.ServerId, cur.UidWithIdent.Ident.Ident);
}
else
{
_handler.RemoveIdent(cur.UidWithIdent.Uid.Uid, cur.UidWithIdent.Ident.ServerId);
}
}
_logger.LogInformation("Server input stream from " + server + " finished");
return new Empty();
}
public override async Task ReceiveStreamIdentStatusChange(ServerMessage request, IServerStreamWriter<IdentChange> responseStream, ServerCallContext context)
{
var server = request.ServerId;
_logger.LogInformation("Registered Server " + server + " output stream");
try
{
while (!context.CancellationToken.IsCancellationRequested)
{
while (_handler.DequeueIdentChange(server, out var cur))
{
await responseStream.WriteAsync(cur).ConfigureAwait(false);
}
await Task.Delay(10).ConfigureAwait(false);
}
}
catch
{
_logger.LogInformation("Server output stream to " + server + " is faulty");
}
_logger.LogInformation("Server output stream to " + server + " is finished");
}
public override Task<UidWithIdentMessage> GetAllIdents(ServerMessage request, ServerCallContext context)
{
var response = new UidWithIdentMessage();
foreach (var item in _handler.GetIdentsForAllExcept(request.ServerId))
{
response.UidWithIdent.Add(new UidWithIdent()
{
Uid = new UidMessage()
{
Uid = item.Key
},
Ident = new CharacterIdentMessage()
{
Ident = item.Value.CharacterIdent,
ServerId = item.Value.ServerId
}
});
}
return Task.FromResult(response);
}
private void EnqueueIdentOnline(UidWithIdent ident)
{
_handler.EnqueueIdentChange(new IdentChange()
{
IsOnline = true,
UidWithIdent = ident
});
}
private void EnqueueIdentOffline(UidWithIdent ident)
{
_handler.EnqueueIdentChange(new IdentChange()
{
IsOnline = false,
UidWithIdent = ident
});
}
}

View File

@@ -1,60 +0,0 @@
using MareSynchronosShared.Protos;
using System.Collections.Concurrent;
using MareSynchronosServer.Identity;
using MareSynchronosShared.Services;
using MareSynchronosShared.Utils;
namespace MareSynchronosServer.Services;
public class LocalClientIdentificationService : IClientIdentificationService
{
protected readonly ConcurrentDictionary<string, UidWithIdent> OnlineClients = new(StringComparer.Ordinal);
private readonly IdentityHandler _identityHandler;
private readonly string _shardName;
public LocalClientIdentificationService(IdentityHandler identityHandler, IConfigurationService<ServerConfiguration> config)
{
_identityHandler = identityHandler;
_shardName = config.GetValueOrDefault(nameof(ServerConfiguration.ShardName), string.Empty);
}
public string GetCharacterIdentForUid(string uid)
{
return _identityHandler.GetIdentForUid(uid).Result.CharacterIdent;
}
public Task<long> GetOnlineUsers()
{
return Task.FromResult((long)_identityHandler.GetOnlineUsers(string.Empty));
}
public string GetServerForUid(string uid)
{
return _identityHandler.GetIdentForUid(uid).Result.ServerId;
}
public bool IsOnCurrentServer(string uid)
{
return string.Equals(_identityHandler.GetIdentForUid(uid).Result.ServerId, _shardName, StringComparison.Ordinal);
}
public void MarkUserOffline(string uid)
{
_identityHandler.RemoveIdent(uid, _shardName);
}
public void MarkUserOnline(string uid, string charaIdent)
{
_identityHandler.SetIdent(uid, _shardName, charaIdent);
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}

View File

@@ -6,6 +6,7 @@ using MareSynchronosShared.Services;
using MareSynchronosShared.Utils;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using StackExchange.Redis;
namespace MareSynchronosServer.Services;
@@ -14,21 +15,21 @@ public class SystemInfoService : IHostedService, IDisposable
private readonly MareMetrics _mareMetrics;
private readonly IConfigurationService<ServerConfiguration> _config;
private readonly IServiceProvider _services;
private readonly IClientIdentificationService _clientIdentService;
private readonly ILogger<SystemInfoService> _logger;
private readonly IHubContext<MareHub, IMareHub> _hubContext;
private readonly IConnectionMultiplexer _redis;
private Timer _timer;
public SystemInfoDto SystemInfoDto { get; private set; } = new();
public SystemInfoService(MareMetrics mareMetrics, IConfigurationService<ServerConfiguration> configurationService, IServiceProvider services,
IClientIdentificationService clientIdentService, ILogger<SystemInfoService> logger, IHubContext<MareHub, IMareHub> hubContext)
ILogger<SystemInfoService> logger, IHubContext<MareHub, IMareHub> hubContext, IConnectionMultiplexer connectionMultiplexer)
{
_mareMetrics = mareMetrics;
_config = configurationService;
_services = services;
_clientIdentService = clientIdentService;
_logger = logger;
_hubContext = hubContext;
_redis = connectionMultiplexer;
}
public Task StartAsync(CancellationToken cancellationToken)
@@ -47,7 +48,8 @@ public class SystemInfoService : IHostedService, IDisposable
_mareMetrics.SetGaugeTo(MetricsAPI.GaugeAvailableWorkerThreads, workerThreads);
_mareMetrics.SetGaugeTo(MetricsAPI.GaugeAvailableIOWorkerThreads, ioThreads);
var onlineUsers = (int)_clientIdentService.GetOnlineUsers().Result;
var endpoint = _redis.GetEndPoints().First();
var onlineUsers = (_redis.GetServer(endpoint).Keys(pattern: "UID:*").Count());
SystemInfoDto = new SystemInfoDto()
{
OnlineUsers = onlineUsers,

View File

@@ -13,7 +13,6 @@ using MareSynchronosServer.Services;
using MareSynchronosServer.Utils;
using MareSynchronosServer.RequirementHandlers;
using MareSynchronosShared.Utils;
using MareSynchronosServer.Identity;
using MareSynchronosShared.Services;
using Prometheus;
using Microsoft.Extensions.Options;
@@ -22,6 +21,7 @@ using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.IdentityModel.Tokens;
using System.Text;
using MareSynchronosServer.Authentication;
using StackExchange.Redis;
namespace MareSynchronosServer;
@@ -109,6 +109,12 @@ public class Startup
options.Configuration.ChannelPrefix = "MareSynchronos";
});
}
var options = ConfigurationOptions.Parse(redis);
options.ClientName = "Mare";
options.ChannelPrefix = "UserData";
ConnectionMultiplexer connectionMultiplexer = ConnectionMultiplexer.Connect(options);
services.AddSingleton<IConnectionMultiplexer>(connectionMultiplexer);
}
private void ConfigureIpRateLimiting(IServiceCollection services)
@@ -220,18 +226,6 @@ public class Startup
RetryPolicy = null
};
services.AddGrpcClient<IdentificationService.IdentificationServiceClient>(c =>
{
c.Address = new Uri(mareConfig.GetValue<string>(nameof(ServerConfiguration.MainServerGrpcAddress)));
}).ConfigureChannel(c =>
{
c.ServiceConfig = new ServiceConfig { MethodConfigs = { noRetryConfig } };
c.HttpHandler = new SocketsHttpHandler()
{
EnableMultipleHttp2Connections = true
};
});
services.AddGrpcClient<ConfigurationService.ConfigurationServiceClient>("MainServer", c =>
{
c.Address = new Uri(mareConfig.GetValue<string>(nameof(ServerConfiguration.MainServerGrpcAddress)));
@@ -244,8 +238,6 @@ public class Startup
};
});
services.AddSingleton<IClientIdentificationService, GrpcClientIdentificationService>();
services.AddHostedService(p => p.GetService<IClientIdentificationService>());
services.AddSingleton<IConfigurationService<ServerConfiguration>>(c => new MareConfigurationServiceClient<ServerConfiguration>(
c.GetService<ILogger<MareConfigurationServiceClient<ServerConfiguration>>>(),
c.GetService<IOptions<ServerConfiguration>>(),
@@ -261,8 +253,6 @@ public class Startup
}
else
{
services.AddSingleton<IdentityHandler>();
services.AddSingleton<IClientIdentificationService, LocalClientIdentificationService>();
services.AddSingleton<IConfigurationService<ServerConfiguration>, MareConfigurationServiceServer<ServerConfiguration>>();
services.AddSingleton<IConfigurationService<MareConfigurationAuthBase>, MareConfigurationServiceServer<MareConfigurationAuthBase>>();
@@ -323,7 +313,6 @@ public class Startup
if (config.IsMain)
{
endpoints.MapGrpcService<GrpcIdentityService>().AllowAnonymous();
endpoints.MapGrpcService<GrpcConfigurationService<ServerConfiguration>>().AllowAnonymous();
endpoints.MapGrpcService<GrpcClientMessageService>().AllowAnonymous();
}