potentially fix zoning issues
This commit is contained in:
@@ -1,17 +1,27 @@
|
||||
using MareSynchronos.Utils;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System.Diagnostics;
|
||||
using System.Text;
|
||||
|
||||
namespace MareSynchronos.Mediator;
|
||||
|
||||
public class MareMediator : IDisposable
|
||||
{
|
||||
private record MediatorSubscriber(IMediatorSubscriber Subscriber, Action<IMessage> Action);
|
||||
private class SubscriberAction
|
||||
{
|
||||
public IMediatorSubscriber Subscriber { get; }
|
||||
public Action<IMessage> Action { get; }
|
||||
|
||||
private readonly Dictionary<Type, HashSet<MediatorSubscriber>> _subscriberDict = new();
|
||||
public SubscriberAction(IMediatorSubscriber subscriber, Action<IMessage> action)
|
||||
{
|
||||
Subscriber = subscriber;
|
||||
Action = action;
|
||||
}
|
||||
}
|
||||
|
||||
private readonly Dictionary<Type, HashSet<SubscriberAction>> _subscriberDict = new();
|
||||
private readonly ILogger<MareMediator> _logger;
|
||||
private readonly PerformanceCollector _performanceCollector;
|
||||
private readonly object _addRemoveLock = new();
|
||||
|
||||
public MareMediator(ILogger<MareMediator> logger, PerformanceCollector performanceCollector)
|
||||
{
|
||||
@@ -21,30 +31,35 @@ public class MareMediator : IDisposable
|
||||
|
||||
public void Subscribe<T>(IMediatorSubscriber subscriber, Action<IMessage> action) where T : IMessage
|
||||
{
|
||||
_subscriberDict.TryAdd(typeof(T), new HashSet<MediatorSubscriber>());
|
||||
|
||||
if (!_subscriberDict[typeof(T)].Add(new(subscriber, action)))
|
||||
lock (_addRemoveLock)
|
||||
{
|
||||
throw new InvalidOperationException("Already subscribed");
|
||||
_subscriberDict.TryAdd(typeof(T), new HashSet<SubscriberAction>());
|
||||
|
||||
if (!_subscriberDict[typeof(T)].Add(new(subscriber, action)))
|
||||
{
|
||||
throw new InvalidOperationException("Already subscribed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Unsubscribe<T>(IMediatorSubscriber subscriber) where T : IMessage
|
||||
{
|
||||
if (_subscriberDict.TryGetValue(typeof(T), out var subscribers))
|
||||
lock (_addRemoveLock)
|
||||
{
|
||||
subscribers.RemoveWhere(p => p.Subscriber == subscriber);
|
||||
if (_subscriberDict.ContainsKey(typeof(T)))
|
||||
{
|
||||
_subscriberDict[typeof(T)].RemoveWhere(p => p.Subscriber == subscriber);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Publish(IMessage message)
|
||||
{
|
||||
if (_subscriberDict.TryGetValue(message.GetType(), out var subscribers))
|
||||
if (_subscriberDict.TryGetValue(message.GetType(), out HashSet<SubscriberAction>? subscribers) && subscribers != null && subscribers.Any())
|
||||
{
|
||||
Stopwatch globalStopwatch = Stopwatch.StartNew();
|
||||
_performanceCollector.LogPerformance(this, $"Publish>{message.GetType().Name}", () =>
|
||||
{
|
||||
foreach (var subscriber in subscribers.Where(s => s.Subscriber != null).ToList())
|
||||
foreach (SubscriberAction subscriber in subscribers?.Where(s => s.Subscriber != null).ToHashSet() ?? new HashSet<SubscriberAction>())
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -52,8 +67,11 @@ public class MareMediator : IDisposable
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogCritical(ex, "Error executing {type} for subscriber {subscriber}, removing from Mediator", message.GetType(), subscriber);
|
||||
_subscriberDict[message.GetType()].RemoveWhere(s => s == subscriber);
|
||||
lock (_addRemoveLock)
|
||||
{
|
||||
var removed = _subscriberDict[message.GetType()].RemoveWhere(s => s == subscriber);
|
||||
_logger.LogCritical(ex, "Error executing {type} for subscriber {subscriber}, removed from Mediator: {removeCount}", message.GetType(), subscriber, removed);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -62,11 +80,14 @@ public class MareMediator : IDisposable
|
||||
|
||||
internal void UnsubscribeAll(IMediatorSubscriber subscriber)
|
||||
{
|
||||
foreach (var kvp in _subscriberDict.ToList())
|
||||
lock (_subscriberDict)
|
||||
{
|
||||
var unSubbed = _subscriberDict[kvp.Key].RemoveWhere(p => p.Subscriber == subscriber);
|
||||
if (unSubbed > 0)
|
||||
_logger.LogDebug("{sub} unsubscribed from {msg}", subscriber, kvp.Key.Name);
|
||||
foreach (KeyValuePair<Type, HashSet<SubscriberAction>> kvp in _subscriberDict.ToList())
|
||||
{
|
||||
int unSubbed = _subscriberDict[kvp.Key]?.RemoveWhere(p => p.Subscriber == subscriber) ?? 0;
|
||||
if (unSubbed > 0)
|
||||
_logger.LogDebug("{sub} unsubscribed from {msg}", subscriber, kvp.Key.Name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user