Switch to GrpcClientIdentificationService and abolish Redis for Idents (#12)
* add GrpcClientIdentificationService * remove unnecessary gauges * set to no retry policy * initialize metrics Co-authored-by: Stanley Dimant <root.darkarchon@outlook.com>
This commit is contained in:
@@ -8,9 +8,7 @@ using MareSynchronosServer.Utils;
|
||||
using MareSynchronosShared.Authentication;
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Models;
|
||||
using MareSynchronosShared.Protos;
|
||||
using MareSynchronosShared.Services;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
@@ -27,7 +25,7 @@ public partial class MareHub : Hub
|
||||
private readonly FileService.FileServiceClient _fileServiceClient;
|
||||
private readonly SystemInfoService _systemInfoService;
|
||||
private readonly IHttpContextAccessor _contextAccessor;
|
||||
private readonly IClientIdentificationService _clientIdentService;
|
||||
private readonly GrpcClientIdentificationService _clientIdentService;
|
||||
private readonly MareHubLogger _logger;
|
||||
private readonly MareDbContext _dbContext;
|
||||
private readonly Uri _cdnFullUri;
|
||||
@@ -38,7 +36,7 @@ public partial class MareHub : Hub
|
||||
|
||||
public MareHub(MareMetrics mareMetrics, AuthService.AuthServiceClient authServiceClient, FileService.FileServiceClient fileServiceClient,
|
||||
MareDbContext mareDbContext, ILogger<MareHub> logger, SystemInfoService systemInfoService, IConfiguration configuration, IHttpContextAccessor contextAccessor,
|
||||
IClientIdentificationService clientIdentService)
|
||||
GrpcClientIdentificationService clientIdentService)
|
||||
{
|
||||
_mareMetrics = mareMetrics;
|
||||
_authServiceClient = authServiceClient;
|
||||
@@ -118,9 +116,11 @@ public partial class MareHub : Hub
|
||||
[Authorize(AuthenticationSchemes = SecretKeyGrpcAuthenticationHandler.AuthScheme)]
|
||||
public async Task<bool> CheckClientHealth()
|
||||
{
|
||||
var needsReconnect = string.IsNullOrEmpty(await _clientIdentService.GetCharacterIdentForUid(AuthenticatedUserId).ConfigureAwait(false));
|
||||
if (needsReconnect)
|
||||
var serverId = await _clientIdentService.GetServerForUid(AuthenticatedUserId).ConfigureAwait(false);
|
||||
bool needsReconnect = false;
|
||||
if (string.IsNullOrEmpty(serverId) || !string.Equals(serverId, _shardName, StringComparison.Ordinal))
|
||||
{
|
||||
needsReconnect = true;
|
||||
_logger.LogCallWarning(Api.InvokeCheckClientHealth, needsReconnect);
|
||||
}
|
||||
return needsReconnect;
|
||||
|
||||
@@ -0,0 +1,158 @@
|
||||
using Grpc.Core;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Protos;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace MareSynchronosServer.Services;
|
||||
|
||||
public class GrpcClientIdentificationService : IHostedService
|
||||
{
|
||||
private readonly string _shardName;
|
||||
private readonly ILogger<GrpcClientIdentificationService> _logger;
|
||||
private readonly IdentificationService.IdentificationServiceClient _grpcIdentClient;
|
||||
private readonly MareMetrics _metrics;
|
||||
protected ConcurrentDictionary<string, string> OnlineClients = new(StringComparer.Ordinal);
|
||||
private bool _grpcIsFaulty = false;
|
||||
|
||||
public GrpcClientIdentificationService(ILogger<GrpcClientIdentificationService> logger, IdentificationService.IdentificationServiceClient gprcIdentClient, MareMetrics metrics, IConfiguration configuration)
|
||||
{
|
||||
var config = configuration.GetSection("MareSynchronos");
|
||||
_shardName = config.GetValue("ServerName", "Main");
|
||||
_logger = logger;
|
||||
_grpcIdentClient = gprcIdentClient;
|
||||
_metrics = metrics;
|
||||
}
|
||||
|
||||
public async Task<string?> GetCharacterIdentForUid(string uid)
|
||||
{
|
||||
if (OnlineClients.TryGetValue(uid, out string ident))
|
||||
{
|
||||
return ident;
|
||||
}
|
||||
|
||||
var result = await InvokeOnGrpc(_grpcIdentClient.GetIdentForUidAsync(new UidMessage { Uid = uid })).ConfigureAwait(false);
|
||||
if (result == default(CharacterIdentMessage)) return null;
|
||||
return result.Ident;
|
||||
}
|
||||
|
||||
public async Task<string?> GetServerForUid(string uid)
|
||||
{
|
||||
if (OnlineClients.ContainsKey(uid))
|
||||
{
|
||||
return _shardName;
|
||||
}
|
||||
|
||||
var result = await InvokeOnGrpc(_grpcIdentClient.GetIdentForUidAsync(new UidMessage { Uid = uid })).ConfigureAwait(false);
|
||||
if (result == default(CharacterIdentMessage)) return null;
|
||||
return result.ServerId;
|
||||
}
|
||||
|
||||
public async Task<long> GetOnlineUsers()
|
||||
{
|
||||
var result = await InvokeOnGrpc(_grpcIdentClient.GetOnlineUserCountAsync(new ServerMessage())).ConfigureAwait(false);
|
||||
if (result == default(OnlineUserCountResponse)) return OnlineClients.Count;
|
||||
return result.Count;
|
||||
}
|
||||
|
||||
public async Task<string?> GetUidForCharacterIdent(string characterIdent)
|
||||
{
|
||||
bool existsLocal = OnlineClients.Any(o => string.Equals(o.Value, characterIdent, StringComparison.Ordinal));
|
||||
if (existsLocal)
|
||||
{
|
||||
return OnlineClients.First(c => string.Equals(c.Value, characterIdent, StringComparison.Ordinal)).Key;
|
||||
}
|
||||
|
||||
var result = await InvokeOnGrpc(_grpcIdentClient.GetUidForCharacterIdentAsync(new CharacterIdentMessage { Ident = characterIdent, ServerId = string.Empty })).ConfigureAwait(false);
|
||||
if (result == default(UidMessage)) return null;
|
||||
return result.Uid;
|
||||
}
|
||||
|
||||
public async Task MarkUserOffline(string uid)
|
||||
{
|
||||
OnlineClients.TryRemove(uid, out _);
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count);
|
||||
await ExecuteOnGrpc(_grpcIdentClient.RemoveIdentForUidAsync(new RemoveIdentMessage() { ServerId = _shardName, Uid = uid })).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task MarkUserOnline(string uid, string charaIdent)
|
||||
{
|
||||
OnlineClients[uid] = charaIdent;
|
||||
_metrics.SetGaugeTo(MetricsAPI.GaugeAuthorizedConnections, OnlineClients.Count);
|
||||
await ExecuteOnGrpc(_grpcIdentClient.SetIdentForUidAsync(new SetIdentMessage() { Ident = charaIdent, ServerId = _shardName, Uid = uid })).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName })).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName })).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<T> InvokeOnGrpc<T>(AsyncUnaryCall<T> toExecute)
|
||||
{
|
||||
try
|
||||
{
|
||||
var result = await toExecute.ConfigureAwait(false);
|
||||
|
||||
await CheckFaultStateAndResend().ConfigureAwait(false);
|
||||
|
||||
return result;
|
||||
}
|
||||
catch
|
||||
{
|
||||
SetGrpcFaulty();
|
||||
|
||||
return default;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ExecuteOnGrpc<T>(AsyncUnaryCall<T> toExecute)
|
||||
{
|
||||
try
|
||||
{
|
||||
await toExecute.ConfigureAwait(false);
|
||||
await CheckFaultStateAndResend().ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
SetGrpcFaulty();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CheckFaultStateAndResend()
|
||||
{
|
||||
if (_grpcIsFaulty)
|
||||
{
|
||||
_logger.LogInformation("GRPC connection is restored, sending current server idents");
|
||||
await _grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }).ConfigureAwait(false);
|
||||
var msg = new ServerIdentMessage();
|
||||
msg.Idents.AddRange(OnlineClients.Select(c => new SetIdentMessage()
|
||||
{
|
||||
Ident = c.Value,
|
||||
Uid = c.Key,
|
||||
ServerId = _shardName
|
||||
}));
|
||||
await _grpcIdentClient.RecreateServerIdentsAsync(msg).ConfigureAwait(false);
|
||||
_grpcIsFaulty = false;
|
||||
}
|
||||
}
|
||||
|
||||
private void SetGrpcFaulty()
|
||||
{
|
||||
if (!_grpcIsFaulty)
|
||||
{
|
||||
_grpcIsFaulty = true;
|
||||
_logger.LogWarning("GRPC connection is faulty");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,6 @@ using MareSynchronos.API;
|
||||
using MareSynchronosServer.Hubs;
|
||||
using MareSynchronosShared.Data;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Services;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
@@ -19,14 +18,14 @@ public class SystemInfoService : IHostedService, IDisposable
|
||||
{
|
||||
private readonly MareMetrics _mareMetrics;
|
||||
private readonly IServiceProvider _services;
|
||||
private readonly IClientIdentificationService _clientIdentService;
|
||||
private readonly GrpcClientIdentificationService _clientIdentService;
|
||||
private readonly ILogger<SystemInfoService> _logger;
|
||||
private readonly IHubContext<MareHub> _hubContext;
|
||||
private Timer _timer;
|
||||
private string _shardName;
|
||||
public SystemInfoDto SystemInfoDto { get; private set; } = new();
|
||||
|
||||
public SystemInfoService(MareMetrics mareMetrics, IConfiguration configuration, IServiceProvider services, IClientIdentificationService clientIdentService, ILogger<SystemInfoService> logger, IHubContext<MareHub> hubContext)
|
||||
public SystemInfoService(MareMetrics mareMetrics, IConfiguration configuration, IServiceProvider services, GrpcClientIdentificationService clientIdentService, ILogger<SystemInfoService> logger, IHubContext<MareHub> hubContext)
|
||||
{
|
||||
_mareMetrics = mareMetrics;
|
||||
_services = services;
|
||||
@@ -56,7 +55,7 @@ public class SystemInfoService : IHostedService, IDisposable
|
||||
{
|
||||
SystemInfoDto = new SystemInfoDto()
|
||||
{
|
||||
OnlineUsers = _clientIdentService.GetOnlineUsers().Result,
|
||||
OnlineUsers = (int)_clientIdentService.GetOnlineUsers().Result,
|
||||
};
|
||||
|
||||
_hubContext.Clients.All.SendAsync(Api.OnUpdateSystemInfo, SystemInfoDto);
|
||||
|
||||
@@ -20,7 +20,6 @@ using Prometheus;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using System.Collections.Generic;
|
||||
using MareSynchronosServer.Services;
|
||||
using MareSynchronosShared.Services;
|
||||
using System.Net.Http;
|
||||
using MareSynchronosServer.Utils;
|
||||
|
||||
@@ -64,6 +63,12 @@ public class Startup
|
||||
}
|
||||
};
|
||||
|
||||
var identMethodConfig = new MethodConfig
|
||||
{
|
||||
Names = { MethodName.Default },
|
||||
RetryPolicy = null
|
||||
};
|
||||
|
||||
services.AddSingleton(new MareMetrics(new List<string>
|
||||
{
|
||||
MetricsAPI.CounterInitializedConnections,
|
||||
@@ -77,7 +82,10 @@ public class Startup
|
||||
MetricsAPI.GaugePairs,
|
||||
MetricsAPI.GaugePairsPaused,
|
||||
MetricsAPI.GaugeAvailableIOWorkerThreads,
|
||||
MetricsAPI.GaugeAvailableWorkerThreads
|
||||
MetricsAPI.GaugeAvailableWorkerThreads,
|
||||
MetricsAPI.GaugeGroups,
|
||||
MetricsAPI.GaugeGroupPairs,
|
||||
MetricsAPI.GaugeGroupPairsPaused
|
||||
}));
|
||||
|
||||
services.AddGrpcClient<AuthService.AuthServiceClient>(c =>
|
||||
@@ -98,6 +106,20 @@ public class Startup
|
||||
{
|
||||
c.ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } };
|
||||
});
|
||||
services.AddGrpcClient<IdentificationService.IdentificationServiceClient>(c =>
|
||||
{
|
||||
c.Address = new Uri(mareConfig.GetValue<string>("ServiceAddress"));
|
||||
}).ConfigureChannel(c =>
|
||||
{
|
||||
c.ServiceConfig = new ServiceConfig { MethodConfigs = { identMethodConfig } };
|
||||
c.HttpHandler = new SocketsHttpHandler()
|
||||
{
|
||||
EnableMultipleHttp2Connections = true
|
||||
};
|
||||
});
|
||||
|
||||
services.AddSingleton<GrpcClientIdentificationService>();
|
||||
services.AddHostedService(p => p.GetService<GrpcClientIdentificationService>());
|
||||
|
||||
services.AddDbContextPool<MareDbContext>(options =>
|
||||
{
|
||||
@@ -135,19 +157,6 @@ public class Startup
|
||||
{
|
||||
options.Configuration.ChannelPrefix = "MareSynchronos";
|
||||
});
|
||||
|
||||
services.AddStackExchangeRedisCache(opt =>
|
||||
{
|
||||
opt.Configuration = redis;
|
||||
opt.InstanceName = "MareSynchronosCache:";
|
||||
});
|
||||
services.AddSingleton<IClientIdentificationService, DistributedClientIdentificationService>();
|
||||
services.AddHostedService(p => p.GetService<IClientIdentificationService>());
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddSingleton<IClientIdentificationService, LocalClientIdentificationService>();
|
||||
services.AddHostedService(p => p.GetService<IClientIdentificationService>());
|
||||
}
|
||||
|
||||
services.AddHostedService(provider => provider.GetService<SystemInfoService>());
|
||||
|
||||
Reference in New Issue
Block a user