adjust grpc api for idents
This commit is contained in:
@@ -18,11 +18,12 @@ public class GrpcClientIdentificationService : IHostedService
|
||||
private readonly ILogger<GrpcClientIdentificationService> _logger;
|
||||
private readonly IdentificationService.IdentificationServiceClient _grpcIdentClient;
|
||||
private readonly MareMetrics _metrics;
|
||||
protected ConcurrentDictionary<string, string> OnlineClients = new(StringComparer.Ordinal);
|
||||
private ConcurrentDictionary<string, string> RemoteCachedIdents = new(StringComparer.Ordinal);
|
||||
private readonly TimeSpan InvalidateCachedIdent = TimeSpan.FromSeconds(30);
|
||||
protected readonly ConcurrentDictionary<string, UidWithIdent> OnlineClients = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, UidWithIdent> RemoteCachedIdents = new(StringComparer.Ordinal);
|
||||
private bool _grpcIsFaulty = false;
|
||||
private CancellationTokenSource _cacheVerificationCts = new();
|
||||
private ConcurrentQueue<IdentChange> _identChangeQueue = new();
|
||||
private CancellationTokenSource _streamCts = new();
|
||||
private CancellationTokenSource _faultCheckCts = new();
|
||||
|
||||
public GrpcClientIdentificationService(ILogger<GrpcClientIdentificationService> logger, IdentificationService.IdentificationServiceClient gprcIdentClient, MareMetrics metrics, IConfiguration configuration)
|
||||
{
|
||||
@@ -33,74 +34,34 @@ public class GrpcClientIdentificationService : IHostedService
|
||||
_metrics = metrics;
|
||||
}
|
||||
|
||||
private async Task HandleCacheVerification(CancellationToken ct)
|
||||
public string? GetCharacterIdentForUid(string uid)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
if (OnlineClients.TryGetValue(uid, out var ident))
|
||||
{
|
||||
try
|
||||
{
|
||||
if (RemoteCachedIdents.Any())
|
||||
{
|
||||
MultiUidMessage req = new();
|
||||
req.Uids.AddRange(RemoteCachedIdents.Select(k => new UidMessage() { Uid = k.Key }));
|
||||
var cacheResponse = await InvokeOnGrpc(_grpcIdentClient.ValidateCachedIdentsAsync(req)).ConfigureAwait(false);
|
||||
|
||||
foreach (var entry in cacheResponse.UidWithIdent)
|
||||
{
|
||||
if (!RemoteCachedIdents.TryGetValue(entry.Uid.Uid, out var ident))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (string.IsNullOrEmpty(entry.Ident.Ident))
|
||||
{
|
||||
RemoteCachedIdents.TryRemove(entry.Uid.Uid, out var _);
|
||||
continue;
|
||||
}
|
||||
|
||||
RemoteCachedIdents[entry.Uid.Uid] = entry.Ident.Ident;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error during cached idents verification");
|
||||
}
|
||||
finally
|
||||
{
|
||||
await Task.Delay(InvalidateCachedIdent, ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<string?> GetCharacterIdentForUid(string uid)
|
||||
{
|
||||
if (OnlineClients.TryGetValue(uid, out string ident))
|
||||
{
|
||||
return ident;
|
||||
return ident.Ident.Ident;
|
||||
}
|
||||
|
||||
if (RemoteCachedIdents.TryGetValue(uid, out var cachedIdent))
|
||||
{
|
||||
return cachedIdent;
|
||||
return cachedIdent.Ident.Ident;
|
||||
}
|
||||
|
||||
var result = await InvokeOnGrpc(_grpcIdentClient.GetIdentForUidAsync(new UidMessage { Uid = uid })).ConfigureAwait(false);
|
||||
if (result == default(CharacterIdentMessage)) return null;
|
||||
RemoteCachedIdents[uid] = result.Ident;
|
||||
return result.Ident;
|
||||
return null;
|
||||
}
|
||||
|
||||
public async Task<string?> GetServerForUid(string uid)
|
||||
public string? GetServerForUid(string uid)
|
||||
{
|
||||
if (OnlineClients.ContainsKey(uid))
|
||||
{
|
||||
return _shardName;
|
||||
}
|
||||
|
||||
var result = await InvokeOnGrpc(_grpcIdentClient.GetIdentForUidAsync(new UidMessage { Uid = uid })).ConfigureAwait(false);
|
||||
if (result == default(CharacterIdentMessage)) return null;
|
||||
return result.ServerId;
|
||||
if (RemoteCachedIdents.TryGetValue(uid, out var cachedIdent))
|
||||
{
|
||||
return cachedIdent.Ident.ServerId;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public async Task<long> GetOnlineUsers()
|
||||
@@ -110,46 +71,172 @@ public class GrpcClientIdentificationService : IHostedService
|
||||
return result.Count;
|
||||
}
|
||||
|
||||
public async Task<string?> GetUidForCharacterIdent(string characterIdent)
|
||||
public string? GetUidForCharacterIdent(string characterIdent)
|
||||
{
|
||||
bool existsLocal = OnlineClients.Any(o => string.Equals(o.Value, characterIdent, StringComparison.Ordinal));
|
||||
bool existsLocal = OnlineClients.Any(o => string.Equals(o.Value.Ident.Ident, characterIdent, StringComparison.Ordinal));
|
||||
if (existsLocal)
|
||||
{
|
||||
return OnlineClients.First(c => string.Equals(c.Value, characterIdent, StringComparison.Ordinal)).Key;
|
||||
return OnlineClients.First(c => string.Equals(c.Value.Ident.Ident, characterIdent, StringComparison.Ordinal)).Key;
|
||||
}
|
||||
|
||||
var result = await InvokeOnGrpc(_grpcIdentClient.GetUidForCharacterIdentAsync(new CharacterIdentMessage { Ident = characterIdent, ServerId = string.Empty })).ConfigureAwait(false);
|
||||
if (result == default(UidMessage)) return null;
|
||||
return result.Uid;
|
||||
bool existsCached = RemoteCachedIdents.Any(o => string.Equals(o.Value.Ident.Ident, characterIdent, StringComparison.Ordinal));
|
||||
if (existsCached)
|
||||
{
|
||||
return RemoteCachedIdents.First(c => string.Equals(c.Value.Ident.Ident, characterIdent, StringComparison.Ordinal)).Key;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public async Task MarkUserOffline(string uid)
|
||||
public void MarkUserOffline(string uid)
|
||||
{
|
||||
OnlineClients.TryRemove(uid, out _);
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count);
|
||||
await ExecuteOnGrpc(_grpcIdentClient.RemoveIdentForUidAsync(new RemoveIdentMessage() { ServerId = _shardName, Uid = uid })).ConfigureAwait(false);
|
||||
if (OnlineClients.TryRemove(uid, out var uidWithIdent))
|
||||
{
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count);
|
||||
_identChangeQueue.Enqueue(new IdentChange()
|
||||
{
|
||||
IsOnline = false,
|
||||
UidWithIdent = uidWithIdent
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public async Task MarkUserOnline(string uid, string charaIdent)
|
||||
public void MarkUserOnline(string uid, string charaIdent)
|
||||
{
|
||||
OnlineClients[uid] = charaIdent;
|
||||
OnlineClients[uid] = new UidWithIdent()
|
||||
{
|
||||
Uid = new()
|
||||
{
|
||||
Uid = uid,
|
||||
},
|
||||
Ident = new()
|
||||
{
|
||||
Ident = charaIdent,
|
||||
ServerId = _shardName
|
||||
}
|
||||
};
|
||||
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count);
|
||||
await ExecuteOnGrpc(_grpcIdentClient.SetIdentForUidAsync(new SetIdentMessage() { Ident = charaIdent, ServerId = _shardName, Uid = uid })).ConfigureAwait(false);
|
||||
_identChangeQueue.Enqueue(new IdentChange()
|
||||
{
|
||||
IsOnline = true,
|
||||
UidWithIdent = OnlineClients[uid]
|
||||
});
|
||||
}
|
||||
|
||||
public async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName })).ConfigureAwait(false);
|
||||
|
||||
_ = HandleCacheVerification(_cacheVerificationCts.Token);
|
||||
_ = RestartStreams();
|
||||
_ = CheckGrpcFaults(_faultCheckCts.Token);
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_cacheVerificationCts.Cancel();
|
||||
_faultCheckCts.Cancel();
|
||||
_streamCts.Cancel();
|
||||
await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName })).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task CheckGrpcFaults(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
await CheckFaultStateAndResend().ConfigureAwait(false);
|
||||
await Task.Delay(250).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RestartStreams()
|
||||
{
|
||||
_streamCts?.Cancel();
|
||||
_streamCts?.Dispose();
|
||||
_streamCts = new();
|
||||
if (!_grpcIsFaulty)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }).ConfigureAwait(false);
|
||||
|
||||
RemoteCachedIdents.Clear();
|
||||
_ = StreamOnlineClientData(_streamCts.Token);
|
||||
_ = ReceiveOnlineClientData(_streamCts.Token);
|
||||
var remoteOnlineClients = await _grpcIdentClient.GetAllIdentsAsync(new ServerMessage()
|
||||
{
|
||||
ServerId = _shardName
|
||||
}).ConfigureAwait(false);
|
||||
foreach (var result in remoteOnlineClients.UidWithIdent)
|
||||
{
|
||||
RemoteCachedIdents[result.Uid.Uid] = result;
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
SetGrpcFaulty();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task StreamOnlineClientData(CancellationToken cts)
|
||||
{
|
||||
try
|
||||
{
|
||||
var stream = _grpcIdentClient.SendStreamIdentStatusChange(cancellationToken: cts);
|
||||
await stream.RequestStream.WriteAsync(new IdentChangeMessage()
|
||||
{
|
||||
Server = new ServerMessage()
|
||||
{
|
||||
ServerId = _shardName
|
||||
}
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
while (!cts.IsCancellationRequested)
|
||||
{
|
||||
if (_identChangeQueue.TryDequeue(out var result))
|
||||
{
|
||||
await stream.RequestStream.WriteAsync(new() { IdentChange = result }, cts).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
await Task.Delay(25, cts).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch
|
||||
{
|
||||
SetGrpcFaulty();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReceiveOnlineClientData(CancellationToken cts)
|
||||
{
|
||||
try
|
||||
{
|
||||
var stream = _grpcIdentClient.ReceiveStreamIdentStatusChange(new ServerMessage()
|
||||
{
|
||||
ServerId = _shardName,
|
||||
});
|
||||
while (await stream.ResponseStream.MoveNext(cts).ConfigureAwait(false))
|
||||
{
|
||||
var cur = stream.ResponseStream.Current;
|
||||
if (cur.IsOnline)
|
||||
{
|
||||
RemoteCachedIdents[cur.UidWithIdent.Uid.Uid] = cur.UidWithIdent;
|
||||
}
|
||||
else if (RemoteCachedIdents.TryGetValue(cur.UidWithIdent.Uid.Uid, out var existingIdent)
|
||||
&& string.Equals(existingIdent.Ident.ServerId, cur.UidWithIdent.Ident.ServerId, StringComparison.Ordinal))
|
||||
{
|
||||
RemoteCachedIdents.TryRemove(cur.UidWithIdent.Uid.Uid, out _);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch
|
||||
{
|
||||
SetGrpcFaulty();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<T> InvokeOnGrpc<T>(AsyncUnaryCall<T> toExecute)
|
||||
{
|
||||
try
|
||||
@@ -188,13 +275,11 @@ public class GrpcClientIdentificationService : IHostedService
|
||||
_grpcIsFaulty = false;
|
||||
|
||||
_logger.LogInformation("GRPC connection is restored, sending current server idents");
|
||||
await _grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }).ConfigureAwait(false);
|
||||
await RestartStreams().ConfigureAwait(false);
|
||||
var msg = new ServerIdentMessage();
|
||||
msg.Idents.AddRange(OnlineClients.Select(c => new SetIdentMessage()
|
||||
{
|
||||
Ident = c.Value,
|
||||
Uid = c.Key,
|
||||
ServerId = _shardName
|
||||
UidWithIdent = c.Value
|
||||
}));
|
||||
await _grpcIdentClient.RecreateServerIdentsAsync(msg).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user