diff --git a/MareSynchronos/MarePlugin.cs b/MareSynchronos/MarePlugin.cs index ff3071e..fee157e 100644 --- a/MareSynchronos/MarePlugin.cs +++ b/MareSynchronos/MarePlugin.cs @@ -77,7 +77,6 @@ public class MarePlugin : MediatorSubscriberBase, IHostedService public MarePlugin(ILogger logger, MareConfigService mareConfigService, ServerConfigurationManager serverConfigurationManager, DalamudUtilService dalamudUtil, - ConfigurationMigrator configurationMigrator, IServiceScopeFactory serviceScopeFactory, MareMediator mediator) : base(logger, mediator) { _mareConfigService = mareConfigService; diff --git a/MareSynchronos/Plugin.cs b/MareSynchronos/Plugin.cs index 71a65a2..b712224 100644 --- a/MareSynchronos/Plugin.cs +++ b/MareSynchronos/Plugin.cs @@ -149,6 +149,7 @@ public sealed class Plugin : IDalamudPlugin s.GetRequiredService(), s.GetRequiredService(), s.GetRequiredService(), s.GetRequiredService(), pluginInterface, s.GetRequiredService(), s.GetRequiredService(), s.GetRequiredService())); + collection.AddHostedService(p => p.GetRequiredService()); collection.AddHostedService(p => p.GetRequiredService()); collection.AddHostedService(p => p.GetRequiredService()); collection.AddHostedService(p => p.GetRequiredService()); diff --git a/MareSynchronos/Services/DalamudUtilService.cs b/MareSynchronos/Services/DalamudUtilService.cs index 3b679f1..9259671 100644 --- a/MareSynchronos/Services/DalamudUtilService.cs +++ b/MareSynchronos/Services/DalamudUtilService.cs @@ -147,7 +147,7 @@ public class DalamudUtilService : IHostedService { if (!_framework.IsInFrameworkUpdateThread) { - _logger.LogTrace("Running Action on framework thread (FrameworkContext: {ctx}): {member} in {file}:{line}", _framework.IsInFrameworkUpdateThread, callerMember, callerFilePath, lineNumber); + //_logger.LogTrace("Running Action on framework thread (FrameworkContext: {ctx}): {member} in {file}:{line}", _framework.IsInFrameworkUpdateThread, callerMember, callerFilePath, lineNumber); await _framework.RunOnFrameworkThread(act).ContinueWith((_) => Task.CompletedTask).ConfigureAwait(false); while (_framework.IsInFrameworkUpdateThread) // yield the thread again, should technically never be triggered @@ -165,7 +165,7 @@ public class DalamudUtilService : IHostedService { if (!_framework.IsInFrameworkUpdateThread) { - _logger.LogTrace("Running Func on framework thread (FrameworkContext: {ctx}): {member} in {file}:{line}", _framework.IsInFrameworkUpdateThread, callerMember, callerFilePath, lineNumber); + //_logger.LogTrace("Running Func on framework thread (FrameworkContext: {ctx}): {member} in {file}:{line}", _framework.IsInFrameworkUpdateThread, callerMember, callerFilePath, lineNumber); var result = await _framework.RunOnFrameworkThread(func).ContinueWith((task) => task.Result).ConfigureAwait(false); while (_framework.IsInFrameworkUpdateThread) // yield the thread again, should technically never be triggered diff --git a/MareSynchronos/Services/Mediator/IMessage.cs b/MareSynchronos/Services/Mediator/IMessage.cs deleted file mode 100644 index d7306aa..0000000 --- a/MareSynchronos/Services/Mediator/IMessage.cs +++ /dev/null @@ -1,3 +0,0 @@ -namespace MareSynchronos.Services.Mediator; - -public interface IMessage { } \ No newline at end of file diff --git a/MareSynchronos/Services/Mediator/MareMediator.cs b/MareSynchronos/Services/Mediator/MareMediator.cs index 8f2e540..b6e7843 100644 --- a/MareSynchronos/Services/Mediator/MareMediator.cs +++ b/MareSynchronos/Services/Mediator/MareMediator.cs @@ -1,18 +1,18 @@ -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System.Collections.Concurrent; using System.Text; namespace MareSynchronos.Services.Mediator; -public sealed class MareMediator : IDisposable +public sealed class MareMediator : IHostedService { private readonly object _addRemoveLock = new(); - private readonly Dictionary _lastErrorTime = new(); - private readonly ILogger _logger; - + private readonly CancellationTokenSource _loopCts = new(); + private readonly ConcurrentQueue _messageQueue = new(); private readonly PerformanceCollectorService _performanceCollector; - private readonly Dictionary> _subscriberDict = new(); public MareMediator(ILogger logger, PerformanceCollectorService performanceCollector) @@ -21,13 +21,6 @@ public sealed class MareMediator : IDisposable _performanceCollector = performanceCollector; } - public void Dispose() - { - _logger.LogTrace("Disposing {type}", GetType()); - _subscriberDict.Clear(); - GC.SuppressFinalize(this); - } - public void PrintSubscriberInfo() { foreach (var kvp in _subscriberDict.SelectMany(c => c.Value.Select(v => v)) @@ -47,32 +40,52 @@ public sealed class MareMediator : IDisposable } } - public void Publish(T message) where T : IMessage + public void Publish(T message) where T : MessageBase { - if (_subscriberDict.TryGetValue(message.GetType(), out HashSet? subscribers) && subscribers != null && subscribers.Any()) + if (message.KeepThreadContext) { - _performanceCollector.LogPerformance(this, $"Publish>{message.GetType().Name}", () => - { - foreach (SubscriberAction subscriber in subscribers?.Where(s => s.Subscriber != null).ToHashSet() ?? new HashSet()) - { - try - { - _performanceCollector.LogPerformance(this, $"Publish>{message.GetType().Name}+{subscriber.Subscriber.GetType().Name}", () => ((Action)subscriber.Action).Invoke(message)); - } - catch (Exception ex) - { - if (_lastErrorTime.TryGetValue(subscriber, out var lastErrorTime) && lastErrorTime.Add(TimeSpan.FromSeconds(10)) > DateTime.UtcNow) - continue; - - _logger.LogCritical(ex, "Error executing {type} for subscriber {subscriber}", message.GetType().Name, subscriber.Subscriber.GetType().Name); - _lastErrorTime[subscriber] = DateTime.UtcNow; - } - } - }); + ExecuteMessage(message); + } + else + { + _messageQueue.Enqueue(message); } } - public void Subscribe(IMediatorSubscriber subscriber, Action action) where T : IMessage + public Task StartAsync(CancellationToken cancellationToken) + { + _logger.LogTrace("Starting MareMediator"); + + _ = Task.Run(async () => + { + while (!_loopCts.Token.IsCancellationRequested) + { + await Task.Delay(100, _loopCts.Token).ConfigureAwait(false); + + HashSet processedMessages = new(); + while (_messageQueue.TryDequeue(out var message)) + { + if (processedMessages.Contains(message)) { continue; } + processedMessages.Add(message); + + ExecuteMessage(message); + } + } + }); + + _logger.LogTrace("Started MareMediator"); + + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _messageQueue.Clear(); + _loopCts.Cancel(); + return Task.CompletedTask; + } + + public void Subscribe(IMediatorSubscriber subscriber, Action action) where T : MessageBase { lock (_addRemoveLock) { @@ -87,7 +100,7 @@ public sealed class MareMediator : IDisposable } } - public void Unsubscribe(IMediatorSubscriber subscriber) where T : IMessage + public void Unsubscribe(IMediatorSubscriber subscriber) where T : MessageBase { lock (_addRemoveLock) { @@ -113,6 +126,44 @@ public sealed class MareMediator : IDisposable } } + private void ExecuteMessage(MessageBase message) + { + if (_subscriberDict.TryGetValue(message.GetType(), out HashSet? subscribers) && subscribers != null && subscribers.Any()) + { + HashSet subscribersCopy = new HashSet(); + lock (_addRemoveLock) + { + subscribersCopy = subscribers?.Where(s => s.Subscriber != null).ToHashSet() ?? new HashSet(); + } + _performanceCollector.LogPerformance(this, $"Execute>{message.GetType().Name}", () => + { + foreach (SubscriberAction subscriber in subscribersCopy) + { + try + { + typeof(MareMediator) + .GetMethod(nameof(ExecuteSubscriber), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)? + .MakeGenericMethod(message.GetType()) + .Invoke(this, new object[] { subscriber, message }); + } + catch (Exception ex) + { + if (_lastErrorTime.TryGetValue(subscriber, out var lastErrorTime) && lastErrorTime.Add(TimeSpan.FromSeconds(10)) > DateTime.UtcNow) + continue; + + _logger.LogCritical(ex, "Error executing {type} for subscriber {subscriber}", message.GetType().Name, subscriber.Subscriber.GetType().Name); + _lastErrorTime[subscriber] = DateTime.UtcNow; + } + } + }); + } + } + + private void ExecuteSubscriber(SubscriberAction subscriber, T message) where T : MessageBase + { + _performanceCollector.LogPerformance(this, $"Publish>{message.GetType().Name}+{subscriber.Subscriber.GetType().Name}", () => ((Action)subscriber.Action).Invoke(message)); + } + private sealed class SubscriberAction { public SubscriberAction(IMediatorSubscriber subscriber, object action) diff --git a/MareSynchronos/Services/Mediator/MessageBase.cs b/MareSynchronos/Services/Mediator/MessageBase.cs new file mode 100644 index 0000000..012e36a --- /dev/null +++ b/MareSynchronos/Services/Mediator/MessageBase.cs @@ -0,0 +1,6 @@ +namespace MareSynchronos.Services.Mediator; + +public abstract record MessageBase +{ + public virtual bool KeepThreadContext => false; +} \ No newline at end of file diff --git a/MareSynchronos/Services/Mediator/Messages.cs b/MareSynchronos/Services/Mediator/Messages.cs index 544d761..dccfcc3 100644 --- a/MareSynchronos/Services/Mediator/Messages.cs +++ b/MareSynchronos/Services/Mediator/Messages.cs @@ -10,59 +10,71 @@ using System.Numerics; namespace MareSynchronos.Services.Mediator; #pragma warning disable MA0048 // File name must match type name -public record SwitchToIntroUiMessage : IMessage; -public record SwitchToMainUiMessage : IMessage; -public record OpenSettingsUiMessage : IMessage; -public record DalamudLoginMessage : IMessage; -public record DalamudLogoutMessage : IMessage; -public record FrameworkUpdateMessage : IMessage; -public record ClassJobChangedMessage(uint? ClassJob) : IMessage; -public record DelayedFrameworkUpdateMessage : IMessage; -public record ZoneSwitchStartMessage : IMessage; -public record ZoneSwitchEndMessage : IMessage; -public record CutsceneStartMessage : IMessage; -public record GposeStartMessage : IMessage; -public record GposeEndMessage : IMessage; -public record CutsceneEndMessage : IMessage; -public record CutsceneFrameworkUpdateMessage : IMessage; -public record ConnectedMessage(ConnectionDto Connection) : IMessage; -public record DisconnectedMessage : IMessage; -public record PenumbraModSettingChangedMessage : IMessage; -public record PenumbraInitializedMessage : IMessage; -public record PenumbraDisposedMessage : IMessage; -public record PenumbraRedrawMessage(IntPtr Address, int ObjTblIdx, bool WasRequested) : IMessage; -public record HeelsOffsetMessage : IMessage; -public record PenumbraResourceLoadMessage(IntPtr GameObject, string GamePath, string FilePath) : IMessage; -public record CustomizePlusMessage : IMessage; -public record PalettePlusMessage(Character Character) : IMessage; -public record HonorificMessage(string NewHonorificTitle) : IMessage; -public record PlayerChangedMessage(API.Data.CharacterData Data) : IMessage; -public record CharacterChangedMessage(GameObjectHandler GameObjectHandler) : IMessage; -public record TransientResourceChangedMessage(IntPtr Address) : IMessage; -public record AddWatchedGameObjectHandler(GameObjectHandler Handler) : IMessage; -public record RemoveWatchedGameObjectHandler(GameObjectHandler Handler) : IMessage; -public record HaltScanMessage(string Source) : IMessage; -public record ResumeScanMessage(string Source) : IMessage; +public record SwitchToIntroUiMessage : MessageBase; +public record SwitchToMainUiMessage : MessageBase; +public record OpenSettingsUiMessage : MessageBase; +public record DalamudLoginMessage : MessageBase; +public record DalamudLogoutMessage : MessageBase; +public record FrameworkUpdateMessage : MessageBase +{ + public override bool KeepThreadContext => true; +} +public record ClassJobChangedMessage(uint? ClassJob) : MessageBase; +public record DelayedFrameworkUpdateMessage : MessageBase +{ + public override bool KeepThreadContext => true; +} +public record ZoneSwitchStartMessage : MessageBase; +public record ZoneSwitchEndMessage : MessageBase; +public record CutsceneStartMessage : MessageBase; +public record GposeStartMessage : MessageBase; +public record GposeEndMessage : MessageBase; +public record CutsceneEndMessage : MessageBase; +public record CutsceneFrameworkUpdateMessage : MessageBase +{ + public override bool KeepThreadContext => true; +} +public record ConnectedMessage(ConnectionDto Connection) : MessageBase; +public record DisconnectedMessage : MessageBase; +public record PenumbraModSettingChangedMessage : MessageBase; +public record PenumbraInitializedMessage : MessageBase; +public record PenumbraDisposedMessage : MessageBase; +public record PenumbraRedrawMessage(IntPtr Address, int ObjTblIdx, bool WasRequested) : MessageBase; +public record HeelsOffsetMessage : MessageBase; +public record PenumbraResourceLoadMessage(IntPtr GameObject, string GamePath, string FilePath) : MessageBase +{ + public override bool KeepThreadContext => true; +} +public record CustomizePlusMessage : MessageBase; +public record PalettePlusMessage(Character Character) : MessageBase; +public record HonorificMessage(string NewHonorificTitle) : MessageBase; +public record PlayerChangedMessage(API.Data.CharacterData Data) : MessageBase; +public record CharacterChangedMessage(GameObjectHandler GameObjectHandler) : MessageBase; +public record TransientResourceChangedMessage(IntPtr Address) : MessageBase; +public record AddWatchedGameObjectHandler(GameObjectHandler Handler) : MessageBase; +public record RemoveWatchedGameObjectHandler(GameObjectHandler Handler) : MessageBase; +public record HaltScanMessage(string Source) : MessageBase; +public record ResumeScanMessage(string Source) : MessageBase; public record NotificationMessage - (string Title, string Message, NotificationType Type, uint TimeShownOnScreen = 3000) : IMessage; -public record CreateCacheForObjectMessage(GameObjectHandler ObjectToCreateFor) : IMessage; -public record ClearCacheForObjectMessage(GameObjectHandler ObjectToCreateFor) : IMessage; -public record CharacterDataCreatedMessage(API.Data.CharacterData CharacterData) : IMessage; -public record PenumbraStartRedrawMessage(IntPtr Address) : IMessage; -public record PenumbraEndRedrawMessage(IntPtr Address) : IMessage; -public record HubReconnectingMessage(Exception? Exception) : IMessage; -public record HubReconnectedMessage(string? Arg) : IMessage; -public record HubClosedMessage(Exception? Exception) : IMessage; -public record DownloadReadyMessage(Guid RequestId) : IMessage; -public record DownloadStartedMessage(GameObjectHandler DownloadId, Dictionary DownloadStatus) : IMessage; -public record DownloadFinishedMessage(GameObjectHandler DownloadId) : IMessage; -public record UiToggleMessage(Type UiType) : IMessage; -public record PlayerUploadingMessage(GameObjectHandler Handler, bool IsUploading) : IMessage; -public record ClearProfileDataMessage(UserData? UserData = null) : IMessage; -public record CyclePauseMessage(UserData UserData) : IMessage; -public record ProfilePopoutToggle(Pair? Pair) : IMessage; -public record CompactUiChange(Vector2 Size, Vector2 Position) : IMessage; -public record ProfileOpenStandaloneMessage(Pair Pair) : IMessage; -public record RemoveWindowMessage(WindowMediatorSubscriberBase Window) : IMessage; + (string Title, string Message, NotificationType Type, uint TimeShownOnScreen = 3000) : MessageBase; +public record CreateCacheForObjectMessage(GameObjectHandler ObjectToCreateFor) : MessageBase; +public record ClearCacheForObjectMessage(GameObjectHandler ObjectToCreateFor) : MessageBase; +public record CharacterDataCreatedMessage(API.Data.CharacterData CharacterData) : MessageBase; +public record PenumbraStartRedrawMessage(IntPtr Address) : MessageBase; +public record PenumbraEndRedrawMessage(IntPtr Address) : MessageBase; +public record HubReconnectingMessage(Exception? Exception) : MessageBase; +public record HubReconnectedMessage(string? Arg) : MessageBase; +public record HubClosedMessage(Exception? Exception) : MessageBase; +public record DownloadReadyMessage(Guid RequestId) : MessageBase; +public record DownloadStartedMessage(GameObjectHandler DownloadId, Dictionary DownloadStatus) : MessageBase; +public record DownloadFinishedMessage(GameObjectHandler DownloadId) : MessageBase; +public record UiToggleMessage(Type UiType) : MessageBase; +public record PlayerUploadingMessage(GameObjectHandler Handler, bool IsUploading) : MessageBase; +public record ClearProfileDataMessage(UserData? UserData = null) : MessageBase; +public record CyclePauseMessage(UserData UserData) : MessageBase; +public record ProfilePopoutToggle(Pair? Pair) : MessageBase; +public record CompactUiChange(Vector2 Size, Vector2 Position) : MessageBase; +public record ProfileOpenStandaloneMessage(Pair Pair) : MessageBase; +public record RemoveWindowMessage(WindowMediatorSubscriberBase Window) : MessageBase; #pragma warning restore MA0048 // File name must match type name \ No newline at end of file