Switch Authentication to asynchronous streaming calls (#16)

* add base grpc service and swap auth service to streaming

* remove Authorize from hub itself

* remove unused usings

* heave files server to net 7, add exception handling in grpc auth stream

Co-authored-by: rootdarkarchon <root.darkarchon@outlook.com>
This commit is contained in:
rootdarkarchon
2022-10-13 16:55:23 +02:00
committed by GitHub
parent d37c1208fe
commit c98e2b2dd6
20 changed files with 313 additions and 159 deletions

View File

@@ -4,12 +4,8 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.SignalR;
using System.Globalization;
using MareSynchronos.API;
using MareSynchronosServer.Utils;
using System.Security.Claims;
using Microsoft.Extensions.Logging;
namespace MareSynchronosServer.Hubs;

View File

@@ -3,7 +3,6 @@ using System.Linq;
using System.Threading.Tasks;
using MareSynchronos.API;
using MareSynchronosServer.Utils;
using MareSynchronosShared.Authentication;
using MareSynchronosShared.Metrics;
using MareSynchronosShared.Models;
using MareSynchronosShared.Protos;
@@ -38,7 +37,7 @@ public partial class MareHub
await Task.Delay(1000).ConfigureAwait(false);
}
await _authServiceClient.RemoveAuthAsync(new RemoveAuthRequest() { Uid = userid }).ConfigureAwait(false);
await _authServiceClient.RemoveAuthAsync(new UidMessage() { Uid = userid }).ConfigureAwait(false);
_dbContext.RemoveRange(ownPairData);
await _dbContext.SaveChangesAsync().ConfigureAwait(false);

View File

@@ -5,7 +5,6 @@ using System.Threading.Tasks;
using MareSynchronos.API;
using MareSynchronosServer.Services;
using MareSynchronosServer.Utils;
using MareSynchronosShared.Authentication;
using MareSynchronosShared.Data;
using MareSynchronosShared.Metrics;
using MareSynchronosShared.Protos;
@@ -18,7 +17,6 @@ using Microsoft.Extensions.Logging;
namespace MareSynchronosServer.Hubs;
[Authorize]
public partial class MareHub : Hub<IMareHub>, IMareHub
{
private readonly MareMetrics _mareMetrics;

View File

@@ -1,8 +1,8 @@
using Grpc.Core;
using MareSynchronosShared.Metrics;
using MareSynchronosShared.Protos;
using MareSynchronosShared.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
@@ -12,7 +12,7 @@ using System.Threading.Tasks;
namespace MareSynchronosServer.Services;
public class GrpcClientIdentificationService : IHostedService
public class GrpcClientIdentificationService : GrpcBaseService
{
private readonly string _shardName;
private readonly ILogger<GrpcClientIdentificationService> _logger;
@@ -22,14 +22,11 @@ public class GrpcClientIdentificationService : IHostedService
private readonly MareMetrics _metrics;
protected readonly ConcurrentDictionary<string, UidWithIdent> OnlineClients = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, UidWithIdent> RemoteCachedIdents = new(StringComparer.Ordinal);
private bool _grpcIsFaulty = false;
private ConcurrentQueue<IdentChange> _identChangeQueue = new();
private CancellationTokenSource _streamCts = new();
private CancellationTokenSource _faultCheckCts = new();
public GrpcClientIdentificationService(ILogger<GrpcClientIdentificationService> logger, IdentificationService.IdentificationServiceClient gprcIdentClient,
IdentificationService.IdentificationServiceClient gprcIdentClientStreamOut,
IdentificationService.IdentificationServiceClient gprcIdentClientStreamIn, MareMetrics metrics, IConfiguration configuration)
IdentificationService.IdentificationServiceClient gprcIdentClientStreamIn, MareMetrics metrics, IConfiguration configuration) : base(logger)
{
var config = configuration.GetSection("MareSynchronos");
_shardName = config.GetValue("ShardName", "Main");
@@ -135,62 +132,6 @@ public class GrpcClientIdentificationService : IHostedService
});
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_ = RestartStreams();
_ = CheckGrpcFaults(_faultCheckCts.Token);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_faultCheckCts.Cancel();
_streamCts.Cancel();
await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName })).ConfigureAwait(false);
}
private async Task CheckGrpcFaults(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await CheckFaultStateAndResend().ConfigureAwait(false);
}
catch { SetGrpcFaulty(); }
await Task.Delay(250).ConfigureAwait(false);
}
}
private async Task RestartStreams()
{
_streamCts?.Cancel();
_streamCts?.Dispose();
_streamCts = new();
if (!_grpcIsFaulty)
{
try
{
await _grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }).ConfigureAwait(false);
RemoteCachedIdents.Clear();
_ = StreamOnlineClientData(_streamCts.Token);
_ = ReceiveOnlineClientData(_streamCts.Token);
var remoteOnlineClients = await _grpcIdentClient.GetAllIdentsAsync(new ServerMessage()
{
ServerId = _shardName
}).ConfigureAwait(false);
foreach (var result in remoteOnlineClients.UidWithIdent)
{
RemoteCachedIdents[result.Uid.Uid] = result;
}
}
catch
{
SetGrpcFaulty();
}
}
}
private async Task StreamOnlineClientData(CancellationToken cts)
{
try
@@ -254,59 +195,49 @@ public class GrpcClientIdentificationService : IHostedService
}
}
private async Task<T> InvokeOnGrpc<T>(AsyncUnaryCall<T> toExecute)
protected override Task StartAsyncInternal(CancellationToken cancellationToken)
{
try
{
var result = await toExecute.ConfigureAwait(false);
await CheckFaultStateAndResend().ConfigureAwait(false);
return result;
}
catch
{
SetGrpcFaulty();
return default;
}
return Task.CompletedTask;
}
private async Task ExecuteOnGrpc<T>(AsyncUnaryCall<T> toExecute)
protected override async Task StopAsyncInternal(CancellationToken cancellationToken)
{
try
{
await toExecute.ConfigureAwait(false);
await CheckFaultStateAndResend().ConfigureAwait(false);
}
catch
{
SetGrpcFaulty();
}
await ExecuteOnGrpc(_grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName })).ConfigureAwait(false);
}
private async Task CheckFaultStateAndResend()
protected override async Task OnGrpcRestore()
{
if (_grpcIsFaulty)
var msg = new ServerIdentMessage();
msg.Idents.AddRange(OnlineClients.Select(c => new SetIdentMessage()
{
await RestartStreams().ConfigureAwait(false);
var msg = new ServerIdentMessage();
msg.Idents.AddRange(OnlineClients.Select(c => new SetIdentMessage()
{
UidWithIdent = c.Value
}));
await _grpcIdentClient.RecreateServerIdentsAsync(msg).ConfigureAwait(false);
_logger.LogInformation("GRPC connection is restored");
_grpcIsFaulty = false;
}
UidWithIdent = c.Value
}));
await _grpcIdentClient.RecreateServerIdentsAsync(msg).ConfigureAwait(false);
}
private void SetGrpcFaulty()
protected override async Task PreStartStream()
{
if (!_grpcIsFaulty)
await _grpcIdentClient.ClearIdentsForServerAsync(new ServerMessage() { ServerId = _shardName }).ConfigureAwait(false);
RemoteCachedIdents.Clear();
}
protected override Task StartStream(CancellationToken ct)
{
_ = StreamOnlineClientData(ct);
_ = ReceiveOnlineClientData(ct);
return Task.CompletedTask;
}
protected override async Task PostStartStream()
{
var remoteOnlineClients = await _grpcIdentClient.GetAllIdentsAsync(new ServerMessage()
{
_grpcIsFaulty = true;
_logger.LogWarning("GRPC connection is faulty");
ServerId = _shardName
}).ConfigureAwait(false);
foreach (var result in remoteOnlineClients.UidWithIdent)
{
RemoteCachedIdents[result.Uid.Uid] = result;
}
}
}
}

View File

@@ -23,6 +23,7 @@ using MareSynchronosServer.Services;
using System.Net.Http;
using MareSynchronosServer.Utils;
using MareSynchronosServer.RequirementHandlers;
using MareSynchronosShared.Services;
namespace MareSynchronosServer;
@@ -119,8 +120,10 @@ public class Startup
};
});
services.AddSingleton<GrpcAuthenticationService>();
services.AddSingleton<GrpcClientIdentificationService>();
services.AddTransient<IAuthorizationHandler, UserRequirementHandler>();
services.AddHostedService(p => p.GetService<GrpcAuthenticationService>());
services.AddHostedService(p => p.GetService<GrpcClientIdentificationService>());
services.AddDbContextPool<MareDbContext>(options =>