Add GPose Together capabilities to server (#32)
* most of gpose together impl * some metrics and fixes * api --------- Co-authored-by: Stanley Dimant <root.darkarchon@outlook.com>
This commit is contained in:
2
MareAPI
2
MareAPI
Submodule MareAPI updated: 4e4b2dab17...8b77956ec8
@@ -1,5 +1,7 @@
|
||||
using MareSynchronos.API.Data.Enum;
|
||||
using MareSynchronos.API.Data;
|
||||
using MareSynchronos.API.Data.Enum;
|
||||
using MareSynchronos.API.Dto;
|
||||
using MareSynchronos.API.Dto.CharaData;
|
||||
using MareSynchronos.API.Dto.Chat;
|
||||
using MareSynchronos.API.Dto.Group;
|
||||
using MareSynchronos.API.Dto.User;
|
||||
@@ -51,5 +53,11 @@ namespace MareSynchronosServer.Hubs
|
||||
public Task Client_UserUpdateProfile(UserDto dto) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
|
||||
|
||||
public Task Client_UserUpdateSelfPairPermissions(UserPermissionsDto dto) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
|
||||
|
||||
public Task Client_GposeLobbyJoin(UserData userData) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
|
||||
public Task Client_GposeLobbyLeave(UserData userData) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
|
||||
public Task Client_GposeLobbyPushCharacterData(CharaDataDownloadDto charaDownloadDto) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
|
||||
public Task Client_GposeLobbyPushPoseData(UserData userData, PoseData poseData) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
|
||||
public Task Client_GposeLobbyPushWorldData(UserData userData, WorldData worldData) => throw new PlatformNotSupportedException("Calling clientside method on server not supported");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
using MareSynchronos.API.Data;
|
||||
using MareSynchronos.API.Dto.CharaData;
|
||||
using MareSynchronosServer.Utils;
|
||||
using MareSynchronosShared.Metrics;
|
||||
using MareSynchronosShared.Utils;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace MareSynchronosServer.Hubs;
|
||||
|
||||
public partial class MareHub
|
||||
{
|
||||
private async Task<string?> GetUserGposeLobby()
|
||||
{
|
||||
return await _redis.GetAsync<string>(GposeLobbyUser).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<List<string>> GetUsersInLobby(string lobbyId, bool includeSelf = false)
|
||||
{
|
||||
var users = await _redis.GetAsync<List<string>>($"GposeLobby:{lobbyId}").ConfigureAwait(false);
|
||||
return users?.Where(u => includeSelf || !string.Equals(u, UserUID, StringComparison.Ordinal)).ToList() ?? [];
|
||||
}
|
||||
|
||||
private async Task AddUserToLobby(string lobbyId, List<string> priorUsers)
|
||||
{
|
||||
_mareMetrics.IncGauge(MetricsAPI.GaugeGposeLobbyUsers);
|
||||
if (priorUsers.Count == 0)
|
||||
_mareMetrics.IncGauge(MetricsAPI.GaugeGposeLobbies);
|
||||
|
||||
await _redis.AddAsync(GposeLobbyUser, lobbyId).ConfigureAwait(false);
|
||||
await _redis.AddAsync($"GposeLobby:{lobbyId}", priorUsers.Concat([UserUID])).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task RemoveUserFromLobby(string lobbyId, List<string> priorUsers)
|
||||
{
|
||||
await _redis.RemoveAsync(GposeLobbyUser).ConfigureAwait(false);
|
||||
|
||||
_mareMetrics.DecGauge(MetricsAPI.GaugeGposeLobbyUsers);
|
||||
|
||||
if (priorUsers.Count == 1)
|
||||
{
|
||||
await _redis.RemoveAsync($"GposeLobby:{lobbyId}").ConfigureAwait(false);
|
||||
_mareMetrics.DecGauge(MetricsAPI.GaugeGposeLobbies);
|
||||
}
|
||||
else
|
||||
{
|
||||
priorUsers.Remove(UserUID);
|
||||
await _redis.AddAsync($"GposeLobby:{lobbyId}", priorUsers).ConfigureAwait(false);
|
||||
await Clients.Users(priorUsers).Client_GposeLobbyLeave(new(UserUID)).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private string GposeLobbyUser => $"GposeLobbyUser:{UserUID}";
|
||||
|
||||
|
||||
[Authorize(Policy = "Identified")]
|
||||
public async Task<string> GposeLobbyCreate()
|
||||
{
|
||||
_logger.LogCallInfo();
|
||||
var alreadyInLobby = await GetUserGposeLobby().ConfigureAwait(false);
|
||||
if (!string.IsNullOrEmpty(alreadyInLobby))
|
||||
{
|
||||
throw new HubException("Already in GPose Lobby, cannot join another");
|
||||
}
|
||||
|
||||
string lobbyId = string.Empty;
|
||||
while (string.IsNullOrEmpty(lobbyId))
|
||||
{
|
||||
lobbyId = StringUtils.GenerateRandomString(30, "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789");
|
||||
var result = await _redis.GetAsync<List<string>>($"GposeLobby:{lobbyId}").ConfigureAwait(false);
|
||||
if (result != null)
|
||||
lobbyId = string.Empty;
|
||||
}
|
||||
|
||||
await AddUserToLobby(lobbyId, []).ConfigureAwait(false);
|
||||
|
||||
return lobbyId;
|
||||
}
|
||||
|
||||
[Authorize(Policy = "Identified")]
|
||||
public async Task<List<UserData>> GposeLobbyJoin(string lobbyId)
|
||||
{
|
||||
_logger.LogCallInfo();
|
||||
var existingLobbyId = await GetUserGposeLobby().ConfigureAwait(false);
|
||||
if (!string.IsNullOrEmpty(existingLobbyId))
|
||||
await GposeLobbyLeave().ConfigureAwait(false);
|
||||
|
||||
var lobbyUsers = await GetUsersInLobby(lobbyId).ConfigureAwait(false);
|
||||
if (!lobbyUsers.Any())
|
||||
return [];
|
||||
|
||||
await AddUserToLobby(lobbyId, lobbyUsers).ConfigureAwait(false);
|
||||
|
||||
var user = await DbContext.Users.SingleAsync(u => u.UID == UserUID).ConfigureAwait(false);
|
||||
await Clients.Users(lobbyUsers.Where(u => !string.Equals(u, UserUID, StringComparison.Ordinal)))
|
||||
.Client_GposeLobbyJoin(user.ToUserData()).ConfigureAwait(false);
|
||||
|
||||
var users = await DbContext.Users.Where(u => lobbyUsers.Contains(u.UID))
|
||||
.Select(u => u.ToUserData())
|
||||
.ToListAsync()
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return users;
|
||||
}
|
||||
|
||||
[Authorize(Policy = "Identified")]
|
||||
public async Task<bool> GposeLobbyLeave()
|
||||
{
|
||||
var lobbyId = await GetUserGposeLobby().ConfigureAwait(false);
|
||||
if (string.IsNullOrEmpty(lobbyId))
|
||||
return true;
|
||||
|
||||
_logger.LogCallInfo();
|
||||
|
||||
var lobbyUsers = await GetUsersInLobby(lobbyId, true).ConfigureAwait(false);
|
||||
await RemoveUserFromLobby(lobbyId, lobbyUsers).ConfigureAwait(false);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
[Authorize(Policy = "Identified")]
|
||||
public async Task GposeLobbyPushCharacterData(CharaDataDownloadDto charaDataDownloadDto)
|
||||
{
|
||||
_logger.LogCallInfo();
|
||||
var lobbyId = await GetUserGposeLobby().ConfigureAwait(false);
|
||||
if (string.IsNullOrEmpty(lobbyId))
|
||||
return;
|
||||
|
||||
var lobbyUsers = await GetUsersInLobby(lobbyId).ConfigureAwait(false);
|
||||
await Clients.Users(lobbyUsers).Client_GposeLobbyPushCharacterData(charaDataDownloadDto).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
[Authorize(Policy = "Identified")]
|
||||
public async Task GposeLobbyPushPoseData(PoseData poseData)
|
||||
{
|
||||
_logger.LogCallInfo();
|
||||
var lobbyId = await GetUserGposeLobby().ConfigureAwait(false);
|
||||
if (string.IsNullOrEmpty(lobbyId))
|
||||
return;
|
||||
|
||||
await _gPoseLobbyDistributionService.PushPoseData(lobbyId, UserUID, poseData).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
[Authorize(Policy = "Identified")]
|
||||
public async Task GposeLobbyPushWorldData(WorldData worldData)
|
||||
{
|
||||
_logger.LogCallInfo();
|
||||
var lobbyId = await GetUserGposeLobby().ConfigureAwait(false);
|
||||
if (string.IsNullOrEmpty(lobbyId))
|
||||
return;
|
||||
|
||||
await _gPoseLobbyDistributionService.PushWorldData(lobbyId, UserUID, worldData).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
private readonly int _maxJoinedGroupsByUser;
|
||||
private readonly int _maxGroupUserCount;
|
||||
private readonly IRedisDatabase _redis;
|
||||
private readonly GPoseLobbyDistributionService _gPoseLobbyDistributionService;
|
||||
private readonly Uri _fileServerAddress;
|
||||
private readonly Version _expectedClientVersion;
|
||||
private readonly int _maxCharaDataByUser;
|
||||
@@ -38,7 +39,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
public MareHub(MareMetrics mareMetrics,
|
||||
IDbContextFactory<MareDbContext> mareDbContextFactory, ILogger<MareHub> logger, SystemInfoService systemInfoService,
|
||||
IConfigurationService<ServerConfiguration> configuration, IHttpContextAccessor contextAccessor,
|
||||
IRedisDatabase redisDb)
|
||||
IRedisDatabase redisDb, GPoseLobbyDistributionService gPoseLobbyDistributionService)
|
||||
{
|
||||
_mareMetrics = mareMetrics;
|
||||
_systemInfoService = systemInfoService;
|
||||
@@ -51,6 +52,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
_maxCharaDataByUser = configuration.GetValueOrDefault(nameof(ServerConfiguration.MaxCharaDataByUser), 10);
|
||||
_contextAccessor = contextAccessor;
|
||||
_redis = redisDb;
|
||||
_gPoseLobbyDistributionService = gPoseLobbyDistributionService;
|
||||
_logger = new MareHubLogger(this, logger);
|
||||
_dbContextLazy = new Lazy<MareDbContext>(() => mareDbContextFactory.CreateDbContext());
|
||||
}
|
||||
@@ -133,6 +135,7 @@ public partial class MareHub : Hub<IMareHub>, IMareHub
|
||||
if (exception != null)
|
||||
_logger.LogCallWarning(MareHubLogger.Args(_contextAccessor.GetIpAddress(), exception.Message, exception.StackTrace));
|
||||
|
||||
await GposeLobbyLeave().ConfigureAwait(false);
|
||||
await RemoveUserFromRedis().ConfigureAwait(false);
|
||||
|
||||
await SendOfflineToAllPairedUsers().ConfigureAwait(false);
|
||||
|
||||
@@ -0,0 +1,149 @@
|
||||
using MareSynchronos.API.Dto.CharaData;
|
||||
using MareSynchronos.API.SignalR;
|
||||
using MareSynchronosServer.Hubs;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using StackExchange.Redis.Extensions.Core.Abstractions;
|
||||
|
||||
namespace MareSynchronosServer.Services;
|
||||
|
||||
public sealed class GPoseLobbyDistributionService : IHostedService, IDisposable
|
||||
{
|
||||
private CancellationTokenSource _runtimeCts = new();
|
||||
private readonly Dictionary<string, Dictionary<string, WorldData>> _lobbyWorldData = [];
|
||||
private readonly Dictionary<string, Dictionary<string, PoseData>> _lobbyPoseData = [];
|
||||
private readonly SemaphoreSlim _lobbyPoseDataModificationSemaphore = new(1, 1);
|
||||
private readonly SemaphoreSlim _lobbyWorldDataModificationSemaphore = new(1, 1);
|
||||
|
||||
public GPoseLobbyDistributionService(ILogger<GPoseLobbyDistributionService> logger, IRedisDatabase redisDb,
|
||||
IHubContext<MareHub, IMareHub> hubContext)
|
||||
{
|
||||
_logger = logger;
|
||||
_redisDb = redisDb;
|
||||
_hubContext = hubContext;
|
||||
}
|
||||
|
||||
private bool _disposed;
|
||||
private readonly ILogger<GPoseLobbyDistributionService> _logger;
|
||||
private readonly IRedisDatabase _redisDb;
|
||||
private readonly IHubContext<MareHub, IMareHub> _hubContext;
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_runtimeCts.Cancel();
|
||||
_runtimeCts.Dispose();
|
||||
_lobbyPoseDataModificationSemaphore.Dispose();
|
||||
_lobbyWorldDataModificationSemaphore.Dispose();
|
||||
|
||||
_disposed = true;
|
||||
}
|
||||
|
||||
public async Task PushWorldData(string lobby, string user, WorldData worldData)
|
||||
{
|
||||
await _lobbyWorldDataModificationSemaphore.WaitAsync().ConfigureAwait(false);
|
||||
if (!_lobbyWorldData.TryGetValue(lobby, out var worldDataDict))
|
||||
{
|
||||
_lobbyWorldData[lobby] = worldDataDict = new(StringComparer.Ordinal);
|
||||
}
|
||||
worldDataDict[user] = worldData;
|
||||
_lobbyWorldDataModificationSemaphore.Release();
|
||||
}
|
||||
|
||||
public async Task PushPoseData(string lobby, string user, PoseData poseData)
|
||||
{
|
||||
await _lobbyPoseDataModificationSemaphore.WaitAsync().ConfigureAwait(false);
|
||||
if (!_lobbyPoseData.TryGetValue(lobby, out var poseDataDict))
|
||||
{
|
||||
_lobbyPoseData[lobby] = poseDataDict = new(StringComparer.Ordinal);
|
||||
}
|
||||
poseDataDict[user] = poseData;
|
||||
_lobbyPoseDataModificationSemaphore.Release();
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_ = WorldDataDistribution(_runtimeCts.Token);
|
||||
_ = PoseDataDistribution(_runtimeCts.Token);
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task WorldDataDistribution(CancellationToken token)
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
await DistributeWorldData(token).ConfigureAwait(false);
|
||||
await Task.Delay(TimeSpan.FromSeconds(1), token).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task PoseDataDistribution(CancellationToken token)
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
await DistributePoseData(token).ConfigureAwait(false);
|
||||
await Task.Delay(TimeSpan.FromSeconds(10), token).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task DistributeWorldData(CancellationToken token)
|
||||
{
|
||||
await _lobbyWorldDataModificationSemaphore.WaitAsync(token).ConfigureAwait(false);
|
||||
var clone = _lobbyWorldData.ToDictionary(k => k.Key, k => k.Value, StringComparer.Ordinal);
|
||||
_lobbyWorldData.Clear();
|
||||
_lobbyWorldDataModificationSemaphore.Release();
|
||||
foreach (var lobbyId in clone)
|
||||
{
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
if (!lobbyId.Value.Values.Any())
|
||||
continue;
|
||||
|
||||
var gposeLobbyUsers = await _redisDb.GetAsync<List<string>>($"GposeLobby:{lobbyId.Key}").ConfigureAwait(false);
|
||||
if (gposeLobbyUsers == null)
|
||||
continue;
|
||||
|
||||
foreach (var data in lobbyId.Value)
|
||||
{
|
||||
await _hubContext.Clients.Users(gposeLobbyUsers.Where(k => !string.Equals(k, data.Key, StringComparison.Ordinal)))
|
||||
.Client_GposeLobbyPushWorldData(new(data.Key), data.Value).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task DistributePoseData(CancellationToken token)
|
||||
{
|
||||
await _lobbyPoseDataModificationSemaphore.WaitAsync(token).ConfigureAwait(false);
|
||||
var clone = _lobbyPoseData.ToDictionary(k => k.Key, k => k.Value, StringComparer.Ordinal);
|
||||
_lobbyPoseData.Clear();
|
||||
_lobbyPoseDataModificationSemaphore.Release();
|
||||
|
||||
foreach (var lobbyId in clone)
|
||||
{
|
||||
token.ThrowIfCancellationRequested();
|
||||
|
||||
if (!lobbyId.Value.Values.Any())
|
||||
continue;
|
||||
|
||||
var gposeLobbyUsers = await _redisDb.GetAsync<List<string>>($"GposeLobby:{lobbyId.Key}").ConfigureAwait(false);
|
||||
if (gposeLobbyUsers == null)
|
||||
continue;
|
||||
|
||||
foreach (var data in lobbyId.Value)
|
||||
{
|
||||
await _hubContext.Clients.Users(gposeLobbyUsers.Where(k => !string.Equals(k, data.Key, StringComparison.Ordinal)))
|
||||
.Client_GposeLobbyPushPoseData(new(data.Key), data.Value).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_runtimeCts.Cancel();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -99,6 +99,8 @@ public class Startup
|
||||
services.AddHostedService(provider => provider.GetService<UserCleanupService>());
|
||||
services.AddSingleton<CharaDataCleanupService>();
|
||||
services.AddHostedService(provider => provider.GetService<CharaDataCleanupService>());
|
||||
services.AddSingleton<GPoseLobbyDistributionService>();
|
||||
services.AddHostedService(provider => provider.GetService<GPoseLobbyDistributionService>());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,6 +278,8 @@ public class Startup
|
||||
MetricsAPI.GaugeGroupPairs,
|
||||
MetricsAPI.GaugeGroupPairsPaused,
|
||||
MetricsAPI.GaugeUsersRegistered,
|
||||
MetricsAPI.GaugeGposeLobbies,
|
||||
MetricsAPI.GaugeGposeLobbyUsers
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
@@ -41,4 +41,6 @@ public class MetricsAPI
|
||||
public const string CounterFileRequests = "mare_files_requests";
|
||||
public const string CounterFileRequestSize = "mare_files_request_size";
|
||||
public const string CounterAccountsCreated = "mare_accounts_created";
|
||||
public const string GaugeGposeLobbies = "mare_gpose_lobbies";
|
||||
public const string GaugeGposeLobbyUsers = "mare_gpose_lobby_users";
|
||||
}
|
||||
Reference in New Issue
Block a user