w/e
This commit is contained in:
@@ -128,31 +128,30 @@ public sealed class MareMediator : IHostedService
|
|||||||
|
|
||||||
private void ExecuteMessage(MessageBase message)
|
private void ExecuteMessage(MessageBase message)
|
||||||
{
|
{
|
||||||
if (_subscriberDict.TryGetValue(message.GetType(), out HashSet<SubscriberAction>? subscribers) && subscribers != null && subscribers.Any())
|
if (!_subscriberDict.TryGetValue(message.GetType(), out HashSet<SubscriberAction>? subscribers) || subscribers == null || !subscribers.Any()) return;
|
||||||
|
|
||||||
|
HashSet<SubscriberAction> subscribersCopy = new HashSet<SubscriberAction>();
|
||||||
|
lock (_addRemoveLock)
|
||||||
{
|
{
|
||||||
HashSet<SubscriberAction> subscribersCopy = new HashSet<SubscriberAction>();
|
subscribersCopy = subscribers?.Where(s => s.Subscriber != null).ToHashSet() ?? new HashSet<SubscriberAction>();
|
||||||
lock (_addRemoveLock)
|
}
|
||||||
|
|
||||||
|
foreach (SubscriberAction subscriber in subscribersCopy)
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
subscribersCopy = subscribers?.Where(s => s.Subscriber != null).ToHashSet() ?? new HashSet<SubscriberAction>();
|
typeof(MareMediator)
|
||||||
|
.GetMethod(nameof(ExecuteSubscriber), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)?
|
||||||
|
.MakeGenericMethod(message.GetType())
|
||||||
|
.Invoke(this, new object[] { subscriber, message });
|
||||||
}
|
}
|
||||||
|
catch (Exception ex)
|
||||||
foreach (SubscriberAction subscriber in subscribersCopy)
|
|
||||||
{
|
{
|
||||||
try
|
if (_lastErrorTime.TryGetValue(subscriber, out var lastErrorTime) && lastErrorTime.Add(TimeSpan.FromSeconds(10)) > DateTime.UtcNow)
|
||||||
{
|
continue;
|
||||||
typeof(MareMediator)
|
|
||||||
.GetMethod(nameof(ExecuteSubscriber), System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)?
|
|
||||||
.MakeGenericMethod(message.GetType())
|
|
||||||
.Invoke(this, new object[] { subscriber, message });
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
if (_lastErrorTime.TryGetValue(subscriber, out var lastErrorTime) && lastErrorTime.Add(TimeSpan.FromSeconds(10)) > DateTime.UtcNow)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
_logger.LogCritical(ex, "Error executing {type} for subscriber {subscriber}", message.GetType().Name, subscriber.Subscriber.GetType().Name);
|
_logger.LogCritical(ex, "Error executing {type} for subscriber {subscriber}", message.GetType().Name, subscriber.Subscriber.GetType().Name);
|
||||||
_lastErrorTime[subscriber] = DateTime.UtcNow;
|
_lastErrorTime[subscriber] = DateTime.UtcNow;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user