* add jwt expiry * start of 0.9 api impl * some stuff idk * some more impl * some cleanup * remove grouppair, add configuration, rework some pair drawing stuff * do some stuff * rework some ui * I don't even know anymore * add cancellationtoken * token bla * ui fixes etc * probably individual adding/removing now working fully as expected * add working report popup * I guess it's more syncshell shit or so * popup shit idk * work out most of the syncshell bullshit I guess * delete some old crap * are we actually getting closer to the end * update pair info stuff * more fixes/adjustments, idk * refactor some things * some rework * some more cleanup * cleanup * make menu buttons w i d e * better icon text buttons * add all syncshell folder and ordering fixes --------- Co-authored-by: rootdarkarchon <root.darkarchon@outlook.com>
178 lines
6.4 KiB
C#
178 lines
6.4 KiB
C#
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using System.Collections.Concurrent;
|
|
using System.Text;
|
|
|
|
namespace MareSynchronos.Services.Mediator;
|
|
|
|
public sealed class MareMediator : IHostedService
|
|
{
|
|
private readonly object _addRemoveLock = new();
|
|
private readonly Dictionary<object, DateTime> _lastErrorTime = [];
|
|
private readonly ILogger<MareMediator> _logger;
|
|
private readonly CancellationTokenSource _loopCts = new();
|
|
private readonly ConcurrentQueue<MessageBase> _messageQueue = new();
|
|
private readonly PerformanceCollectorService _performanceCollector;
|
|
private readonly Dictionary<Type, HashSet<SubscriberAction>> _subscriberDict = [];
|
|
|
|
public MareMediator(ILogger<MareMediator> logger, PerformanceCollectorService performanceCollector)
|
|
{
|
|
_logger = logger;
|
|
_performanceCollector = performanceCollector;
|
|
}
|
|
|
|
public void PrintSubscriberInfo()
|
|
{
|
|
foreach (var subscriber in _subscriberDict.SelectMany(c => c.Value.Select(v => v.Subscriber))
|
|
.DistinctBy(p => p).OrderBy(p => p.GetType().FullName, StringComparer.Ordinal).ToList())
|
|
{
|
|
_logger.LogInformation("Subscriber {type}: {sub}", subscriber.GetType().Name, subscriber.ToString());
|
|
StringBuilder sb = new();
|
|
sb.Append("=> ");
|
|
foreach (var item in _subscriberDict.Where(item => item.Value.Any(v => v.Subscriber == subscriber)).ToList())
|
|
{
|
|
sb.Append(item.Key.Name).Append(", ");
|
|
}
|
|
|
|
if (!string.Equals(sb.ToString(), "=> ", StringComparison.Ordinal))
|
|
_logger.LogInformation("{sb}", sb.ToString());
|
|
_logger.LogInformation("---");
|
|
}
|
|
}
|
|
|
|
public void Publish<T>(T message) where T : MessageBase
|
|
{
|
|
if (message.KeepThreadContext)
|
|
{
|
|
ExecuteMessage(message);
|
|
}
|
|
else
|
|
{
|
|
_messageQueue.Enqueue(message);
|
|
}
|
|
}
|
|
|
|
public Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
_logger.LogTrace("Starting MareMediator");
|
|
|
|
_ = Task.Run(async () =>
|
|
{
|
|
while (!_loopCts.Token.IsCancellationRequested)
|
|
{
|
|
await Task.Delay(100, _loopCts.Token).ConfigureAwait(false);
|
|
|
|
HashSet<MessageBase> processedMessages = [];
|
|
while (_messageQueue.TryDequeue(out var message))
|
|
{
|
|
if (processedMessages.Contains(message)) { continue; }
|
|
processedMessages.Add(message);
|
|
|
|
ExecuteMessage(message);
|
|
}
|
|
}
|
|
});
|
|
|
|
_logger.LogTrace("Started MareMediator");
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
_messageQueue.Clear();
|
|
_loopCts.Cancel();
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public void Subscribe<T>(IMediatorSubscriber subscriber, Action<T> action) where T : MessageBase
|
|
{
|
|
lock (_addRemoveLock)
|
|
{
|
|
_subscriberDict.TryAdd(typeof(T), []);
|
|
|
|
if (!_subscriberDict[typeof(T)].Add(new(subscriber, action)))
|
|
{
|
|
throw new InvalidOperationException("Already subscribed");
|
|
}
|
|
|
|
_logger.LogDebug("Subscriber added for message {message}: {sub}", typeof(T).Name, subscriber.GetType().Name);
|
|
}
|
|
}
|
|
|
|
public void Unsubscribe<T>(IMediatorSubscriber subscriber) where T : MessageBase
|
|
{
|
|
lock (_addRemoveLock)
|
|
{
|
|
if (_subscriberDict.ContainsKey(typeof(T)))
|
|
{
|
|
_subscriberDict[typeof(T)].RemoveWhere(p => p.Subscriber == subscriber);
|
|
}
|
|
}
|
|
}
|
|
|
|
internal void UnsubscribeAll(IMediatorSubscriber subscriber)
|
|
{
|
|
lock (_addRemoveLock)
|
|
{
|
|
foreach (Type kvp in _subscriberDict.Select(k => k.Key))
|
|
{
|
|
int unSubbed = _subscriberDict[kvp]?.RemoveWhere(p => p.Subscriber == subscriber) ?? 0;
|
|
if (unSubbed > 0)
|
|
{
|
|
_logger.LogDebug("{sub} unsubscribed from {msg}", subscriber.GetType().Name, kvp.Name);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private void ExecuteMessage(MessageBase message)
|
|
{
|
|
if (!_subscriberDict.TryGetValue(message.GetType(), out HashSet<SubscriberAction>? subscribers) || subscribers == null || !subscribers.Any()) return;
|
|
|
|
HashSet<SubscriberAction> subscribersCopy = [];
|
|
lock (_addRemoveLock)
|
|
{
|
|
subscribersCopy = subscribers?.Where(s => s.Subscriber != null).ToHashSet() ?? [];
|
|
}
|
|
|
|
foreach (SubscriberAction subscriber in subscribersCopy)
|
|
{
|
|
try
|
|
{
|
|
#pragma warning disable S3011 // Reflection should not be used to increase accessibility of classes, methods, or fields
|
|
typeof(MareMediator)
|
|
.GetMethod(nameof(ExecuteSubscriber), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)?
|
|
.MakeGenericMethod(message.GetType())
|
|
.Invoke(this, new object[] { subscriber, message });
|
|
#pragma warning restore S3011 // Reflection should not be used to increase accessibility of classes, methods, or fields
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
if (_lastErrorTime.TryGetValue(subscriber, out var lastErrorTime) && lastErrorTime.Add(TimeSpan.FromSeconds(10)) > DateTime.UtcNow)
|
|
continue;
|
|
|
|
_logger.LogError(ex.InnerException ?? ex, "Error executing {type} for subscriber {subscriber}", message.GetType().Name, subscriber.Subscriber.GetType().Name);
|
|
_lastErrorTime[subscriber] = DateTime.UtcNow;
|
|
}
|
|
}
|
|
}
|
|
|
|
private void ExecuteSubscriber<T>(SubscriberAction subscriber, T message) where T : MessageBase
|
|
{
|
|
var isSameThread = message.KeepThreadContext ? "$" : string.Empty;
|
|
_performanceCollector.LogPerformance(this, $"{isSameThread}Execute>{message.GetType().Name}+{subscriber.Subscriber.GetType().Name}>{subscriber.Subscriber}", () => ((Action<T>)subscriber.Action).Invoke(message));
|
|
}
|
|
|
|
private sealed class SubscriberAction
|
|
{
|
|
public SubscriberAction(IMediatorSubscriber subscriber, object action)
|
|
{
|
|
Subscriber = subscriber;
|
|
Action = action;
|
|
}
|
|
|
|
public object Action { get; }
|
|
public IMediatorSubscriber Subscriber { get; }
|
|
}
|
|
} |