adjustments to internal messaging

This commit is contained in:
rootdarkarchon
2023-04-27 15:10:59 +02:00
parent a909319ae3
commit 4f200f2072
7 changed files with 160 additions and 94 deletions

View File

@@ -1,18 +1,18 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Text;
namespace MareSynchronos.Services.Mediator;
public sealed class MareMediator : IDisposable
public sealed class MareMediator : IHostedService
{
private readonly object _addRemoveLock = new();
private readonly Dictionary<object, DateTime> _lastErrorTime = new();
private readonly ILogger<MareMediator> _logger;
private readonly CancellationTokenSource _loopCts = new();
private readonly ConcurrentQueue<MessageBase> _messageQueue = new();
private readonly PerformanceCollectorService _performanceCollector;
private readonly Dictionary<Type, HashSet<SubscriberAction>> _subscriberDict = new();
public MareMediator(ILogger<MareMediator> logger, PerformanceCollectorService performanceCollector)
@@ -21,13 +21,6 @@ public sealed class MareMediator : IDisposable
_performanceCollector = performanceCollector;
}
public void Dispose()
{
_logger.LogTrace("Disposing {type}", GetType());
_subscriberDict.Clear();
GC.SuppressFinalize(this);
}
public void PrintSubscriberInfo()
{
foreach (var kvp in _subscriberDict.SelectMany(c => c.Value.Select(v => v))
@@ -47,32 +40,52 @@ public sealed class MareMediator : IDisposable
}
}
public void Publish<T>(T message) where T : IMessage
public void Publish<T>(T message) where T : MessageBase
{
if (_subscriberDict.TryGetValue(message.GetType(), out HashSet<SubscriberAction>? subscribers) && subscribers != null && subscribers.Any())
if (message.KeepThreadContext)
{
_performanceCollector.LogPerformance(this, $"Publish>{message.GetType().Name}", () =>
{
foreach (SubscriberAction subscriber in subscribers?.Where(s => s.Subscriber != null).ToHashSet() ?? new HashSet<SubscriberAction>())
{
try
{
_performanceCollector.LogPerformance(this, $"Publish>{message.GetType().Name}+{subscriber.Subscriber.GetType().Name}", () => ((Action<T>)subscriber.Action).Invoke(message));
}
catch (Exception ex)
{
if (_lastErrorTime.TryGetValue(subscriber, out var lastErrorTime) && lastErrorTime.Add(TimeSpan.FromSeconds(10)) > DateTime.UtcNow)
continue;
_logger.LogCritical(ex, "Error executing {type} for subscriber {subscriber}", message.GetType().Name, subscriber.Subscriber.GetType().Name);
_lastErrorTime[subscriber] = DateTime.UtcNow;
}
}
});
ExecuteMessage(message);
}
else
{
_messageQueue.Enqueue(message);
}
}
public void Subscribe<T>(IMediatorSubscriber subscriber, Action<T> action) where T : IMessage
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogTrace("Starting MareMediator");
_ = Task.Run(async () =>
{
while (!_loopCts.Token.IsCancellationRequested)
{
await Task.Delay(100, _loopCts.Token).ConfigureAwait(false);
HashSet<MessageBase> processedMessages = new();
while (_messageQueue.TryDequeue(out var message))
{
if (processedMessages.Contains(message)) { continue; }
processedMessages.Add(message);
ExecuteMessage(message);
}
}
});
_logger.LogTrace("Started MareMediator");
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_messageQueue.Clear();
_loopCts.Cancel();
return Task.CompletedTask;
}
public void Subscribe<T>(IMediatorSubscriber subscriber, Action<T> action) where T : MessageBase
{
lock (_addRemoveLock)
{
@@ -87,7 +100,7 @@ public sealed class MareMediator : IDisposable
}
}
public void Unsubscribe<T>(IMediatorSubscriber subscriber) where T : IMessage
public void Unsubscribe<T>(IMediatorSubscriber subscriber) where T : MessageBase
{
lock (_addRemoveLock)
{
@@ -113,6 +126,44 @@ public sealed class MareMediator : IDisposable
}
}
private void ExecuteMessage(MessageBase message)
{
if (_subscriberDict.TryGetValue(message.GetType(), out HashSet<SubscriberAction>? subscribers) && subscribers != null && subscribers.Any())
{
HashSet<SubscriberAction> subscribersCopy = new HashSet<SubscriberAction>();
lock (_addRemoveLock)
{
subscribersCopy = subscribers?.Where(s => s.Subscriber != null).ToHashSet() ?? new HashSet<SubscriberAction>();
}
_performanceCollector.LogPerformance(this, $"Execute>{message.GetType().Name}", () =>
{
foreach (SubscriberAction subscriber in subscribersCopy)
{
try
{
typeof(MareMediator)
.GetMethod(nameof(ExecuteSubscriber), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)?
.MakeGenericMethod(message.GetType())
.Invoke(this, new object[] { subscriber, message });
}
catch (Exception ex)
{
if (_lastErrorTime.TryGetValue(subscriber, out var lastErrorTime) && lastErrorTime.Add(TimeSpan.FromSeconds(10)) > DateTime.UtcNow)
continue;
_logger.LogCritical(ex, "Error executing {type} for subscriber {subscriber}", message.GetType().Name, subscriber.Subscriber.GetType().Name);
_lastErrorTime[subscriber] = DateTime.UtcNow;
}
}
});
}
}
private void ExecuteSubscriber<T>(SubscriberAction subscriber, T message) where T : MessageBase
{
_performanceCollector.LogPerformance(this, $"Publish>{message.GetType().Name}+{subscriber.Subscriber.GetType().Name}", () => ((Action<T>)subscriber.Action).Invoke(message));
}
private sealed class SubscriberAction
{
public SubscriberAction(IMediatorSubscriber subscriber, object action)