Event Aggregator

Even though Event Aggregator could be termed as one of the lesser known patterns, the useful of the pattern cannot be ignored. In fact, it makes me wonder why it is lesser known.

Let us recap the observer pattern. One of the problems with observer pattern is that it could grow out of hand when the system has multiple event sources and multiple event subscribers. The Subscribers could themselves be source of some other events.

The Event Aggregator, as Martin Fowler mentions it, is a single source of truth – or rather in this case, single source of events indirection. It gathers events from multiple sources and propagate them to interested parties. The event aggregator is loosely coupled with both Observers and the Subjects, sitting in the middle of observers and publishers as a centralized message broker, allowing a single point of registration of events.

I have been using Caliburn Micro for my WPF applications for a while and it provides a quite useful implementation of Event Aggregator. Following section discusses a simplified version of Caliburn Micro’s implementation of Event Aggregator.

public interface IEventAggregator
{
    void Subscribe(object subscriber);
    void Unsubscribe(object subscriber);
    void PublishMessage(object message);
}

The Event Aggregator has three major methods exposed. One method each for Subscribing and Unsubscribing, and anther additional method for Subjects to publish events. The Subscribe() method could be defined as

private readonly List<WeakReference<Handler>> _handlers = new List<WeakReference<Handler>>();

public void Subscribe(object subscriber)
{
    lock (_handlers)
    {
        if (_handlers.Any(x => x == subscriber))
        {
            return;
        }

        _handlers.Add(new WeakReference<Handler>(new Handler(subscriber)));
    }
}

The Subscribe() method implementation is pretty straightforward. Each time it is invoked it would add the subscriber instance (passed as parameter) to a collection. The collection has been maintained as WeakReference – being a distant cousin of Observer pattern, the EventAggregator also inheritance some of the memory leak issues that is evident in Observer Pattern. The WeakReference allows us to workaround the problem to an extend.

As seen in the code above, the collection is of a type Handler (or rather WeakReference<Handler> to precise).The Handler Type maintains a reference to the subscriber along with a list of Message Types subscriber is interested in and the corresponding Event Handlers. We will look at the Handler definition before discussing further.

private class Handler
{
    private readonly object _subscriber;
    private Dictionary<Type, MethodInfo> _handlers = new Dictionary<Type, MethodInfo>();
    public Handler(object subscriber)
    {
        _subscriber = subscriber;

        var interfaces = _subscriber.GetType()
                                    .GetTypeInfo()
                                    .ImplementedInterfaces
                                    .Where(x => x.GetTypeInfo().IsGenericType && x.GetGenericTypeDefinition() == typeof(IHandleAsync<>));

        foreach (var @interface in interfaces)
        {
            var genericTypeArg = @interface.GetTypeInfo().GenericTypeArguments[0];
            var method = @interface.GetRuntimeMethod("HandleAsync", new[] { genericTypeArg });

            _handlers.Add(genericTypeArg, method);
        }

    }

    public object Subscriber => _subscriber;
    public void Handle(Type messageType, object message)
    {
        if (!_handlers.ContainsKey(messageType))
        {
            throw new Exception($"Message type {message} not registered");
        }

        var method = _handlers[messageType];
        method.Invoke(_subscriber, new[] { message });
    }
}


As you can observe in the code above, the Handler type uses reflection to iterate over the interfaces implemented by the subscriber. In particular, the Handler is interested in the generic interface IHandleAsync<>. Let us look at the interface first.

public interface IHandleAsync<T> where T : EventMessageBase
{
    Task HandleAsync(T message);
}

As seen in the code above, the interface has only a single method. HandleAsync<T>() which accepts a single parameter of Type T. This interface would be implemented by the subscribers who are interested in the publication of messages of type T.

Let us now look at the Unsubscribe method.

public void Unsubscribe(object subscriber)
{
    lock (_handlers)
    {
        WeakReference<Handler> toRemove = null;
        lock (_handlers)
        {

            foreach (var handler in _handlers)
            {
                if (handler.TryGetTarget(out var target))
                {
                    if (target.Subscriber == subscriber)
                        toRemove = handler;
                }
            }

            if(toRemove != null)
            {
                _handlers.Remove(toRemove);
            }
        }
    }
}

Not too much to explain there other than being conscience of thread safety. One another point to note in the Unsubscribe() method is that it removes any instances of Handler that has been reclaimed by the GC.

The last bit of code we need to look at in the implementation of EventAggregator is the PublishMessage() method.

public void PublishMessage(object message)
{
    if (message == null) throw new ArgumentNullException(nameof(message));

    var handlersToNotify = new List<Handler>();
    lock (_handlers)
    {

        foreach(var handler in _handlers)
        {
            if(handler.TryGetTarget(out var target))
            {
                handlersToNotify.Add(target);
            }
        }
    }

    foreach (var handler in handlersToNotify)
    {
        handler.Handle(message.GetType(), message);
    }

}

The core functionality of the PublishMessage() method is to accept a message and iterate over the active Subscribers. It would then inturn Handler instance to check the internal dictionary of the instance has subscribed for the message of type which was accepted as parameter.This is accomplished using the Handler.Handle() method. It would retrieve the event handlers associated and invoke them as seen the code below (implementation of the Handler.Handle() which we had already seen above)

public void Handle(Type messageType, object message)
{
    if (!_handlers.ContainsKey(messageType))
    {
        throw new Exception($"Message type {message} not registered");
    }

    var method = _handlers[messageType];
    method.Invoke(_subscriber, new[] { message });
}

That would complete our EventAggregator. Now it is time to see EventAggregator in action and how the subscriber subscribles themselves.

public class SubscriberAlpha:IHandleAsync<UserSaysHelloMessage>
{
    public SubscriberAlpha(IEventAggregator eventAggregator)
    {
        eventAggregator.Subscribe(this);
    }

    public Task HandleAsync(UserSaysHelloMessage message)
    {
        Console.WriteLine($"Message : {message.Message}");
        return Task.CompletedTask;
    }

}

As seen in the code above, the subscriber has done couple of things here. First, it has implemented the IHandleAsync<> interface with the generic parameter UserSaysHelloMessage.The generic parameter type, in this case UserSaysHelloMessage is the type of the message which this particular subscriber is interested in.

It also uses the instance of IEventAggregator to subscribe to the centerlized EventAggregator. The HandleMessageAsync(UserSaysHelloMessage msg) method would be called by the Event Aggregator when any publisher publishes a message of the same type.

Let us write some client code to see how our little Event Aggregator example works.

var eventAggregator = new EventAggregator();
var subscriber = new SubscriberAlpha(eventAggregator);
var publisher = new PublisherAlpha(eventAggregator);

publisher.PublishMessage("John says hello");

eventAggregator.Unsubscribe(subscriber);

publisher.PublishMessage("John says hello");

Where the PublisherAlpha is defined as

public class PublisherAlpha
{
    private IEventAggregator _aggregator;
    public PublisherAlpha(IEventAggregator eventAggregator)
    {
        _aggregator = eventAggregator;
    }

    public void PublishMessage(string message)
    {
        _aggregator.PublishMessage(new UserSaysHelloMessage(this,message));
    }
}

As expected our output would contain only a single Message.

Message : John says hello

By the time the publisher sends the second message, the subscriber has already unsubscribed from the central message broker. Hence, the second message would not be notified to the subscriber.

While I liked the implementation of Event Aggregator by Caliburn Micro, one part which I would like to tweak around would be have the subscriber the ability to subscribe/unsubscribe individual message types in the runtime. In the next blog post, we will look into another example of Event aggregator implementation, which would address this problem (or rather wish).

If you would like to have a look at the complete code sample shown here, you can access it here in my github.

Caliburn.Micro #006 : Event Aggregators & Window Managers

Consider the classic scenario when you need to show the currently logged in User Name in your main Window, once you have successfully logged in. The The login Window is supposed to be a Modal Dialog, and isn’t remotely aware of the Label displaying Username in the Main Window.

WPF handles Modal Dialogs and messaging service between Views using Window Managers and Event Aggregators.

Event Aggregators allows a loosely coupled message parsing mechanism between different View Models in the system. As the Caliburn Micro Documentation states, an Event Aggregator is a service that provides the ability to publish an object from one entity to another in a loosely based fashion.

Let’s being by creating our Sample Application using the Caliburn.Micro Template Pack, which makes it easier to kick start a Caliburn.Micro based application. The complete sample code described in this post is available in Github.
Capture

 

We would be using MEF as our IoC Container. So let’s make changes to Bootstrapper to accommodate MEF. For sake of ease, we will remove the existing Bootstrapper and add MEF Template supplied along with the Caliburn.Micro Template Pack.

 

Capture1
We will now our Contract Interface and the required LoginView as well as our MainWindow (we will it ShellView). The skeleton of code looks like following now.
ShellView.Xaml
 <Grid>
        <Grid.ColumnDefinitions>
            <ColumnDefinition Width="*"></ColumnDefinition>
        </Grid.ColumnDefinitions>
        <Grid.RowDefinitions>
            <RowDefinition Height="*"></RowDefinition>
            <RowDefinition Height="50"></RowDefinition>
        </Grid.RowDefinitions>
        
        <StackPanel Grid.Row="0" HorizontalAlignment="Center" VerticalAlignment="Center">
            <Label x:Name="UserName" FontWeight="Bold" FontSize="32"></Label>
        </StackPanel>
        
        <StackPanel Grid.Row="1" Margin="10,10,10,10">
            <Button Margin="5,5,5,5" cal:Message.Attach="[Event Click]=[Action PromptForLogin]">Login</Button>
        </StackPanel>
    </Grid> 

IShell Interface

    public interface IShell
    {

    }

ShellViewModel

  [Export(typeof(IShell))]
    public class ShellViewModel : PropertyChangedBase, IShell
    {
        private string _userName  = default;
        public string UserName
        {
            get => _userName;
            set
            {
                _userName = value;
                NotifyOfPropertyChange(nameof(UserName));
            }
        }

        public void PromptForLogin() {};

    }
We will add the skeleton code for Login Window as well before adding the code to invoke it as a separate Window (not Content Control).
LoginView.Xaml
 <Grid>
        <Grid.ColumnDefinitions>
            <ColumnDefinition></ColumnDefinition>
        </Grid.ColumnDefinitions>
        <Grid.RowDefinitions>
            <RowDefinition Height="20"></RowDefinition>
            <RowDefinition Height="30"></RowDefinition>
            <RowDefinition Height="30"></RowDefinition>
            <RowDefinition Height="30"></RowDefinition>
            <RowDefinition Height="20"></RowDefinition>
        </Grid.RowDefinitions>
        
        <StackPanel HorizontalAlignment="Stretch" VerticalAlignment="Center" Background="Lavender">
            <Label>User Login</Label>
        </StackPanel>
        <TextBox Margin="5,5,5,5" Grid.Row="1" Name="UserName">UserName</TextBox>
        <TextBox Margin="5,5,5,5" Grid.Row="2" Name="Password">Password</TextBox>
        <Button Grid.Row="3" Margin="5,5,5,5" cal:Message.Attach="[Event Click]=[Action Validate(UserName,Password)]">Login</Button>
    </Grid>

ILogin Interface

public interface ILogin
{
    bool Validate(string userName, string passWord);
}

LoginViewModel

[Export(typeof(ILogin))]
public class LoginViewModel : ILogin
{
    // Since this is a demo, we will always return true
    public bool Validate(string userName, string passWord) => true;

}

Alright, so we have our barebone code ready. Time to link up the chain. The first task is to ensure we have ShellView as our primary start up Window, which we set by altering the code in our Bootstrapper.

protected override void OnStartup(object sender, StartupEventArgs e)
{
    DisplayRootViewFor();
}

The next task is to ensure the Login Window User Clicks Login Button. We do this by utilizing the Window Manager, which is injected (via Dependency Injection) into ShellViewModel. We will add a constructor for ShellViewModel decorated with the ImportingConstructor Attribute. We will also inject LoginViewModel as well in to the class. We will then use the WindowManager for invoking the LoginViewModel as a Modal Dialog. Our constructor and PromptForLogin Method looks like following now.

[ImportingConstructor]
public ShellViewModel(IWindowManager windowManager,ILogin loginWindow)
{

}

public void PromptForLogin() => _windowManager.ShowDialog(_loginWindow);

This opens up the Login Window for you. All good till now, the Login Window is displayed, and you type in the UserName & Password, and Click the Login button to invoke the Validate Method. But how do you pass the UserName information to the ShellViewModel ? This is where Event Aggregator comes into place.

Event Aggregators allows ViewModels to pass (broadcast) information (in a model) to any other View Models who has subscribed to the event. We will begin by creating the model which would contain the information that will passed along Event Aggregators.

public class UserInfoModel
{
     public string UserName { get; set; }
}

We will then inject an instance of EventAggregator in the LoginViewModel. We will also modify our Validate Method a bit. The LoginViewModel class now looks like following.

[Export(typeof(ILogin))]
public class LoginViewModel : ILogin
{
private IEventAggregator _eventAggregator;

[ImportingConstructor]
public LoginViewModel(IEventAggregator eventAggregator)
{
    _eventAggregator = eventAggregator;
    _eventAggregator.Subscribe(this);
}

// Since this is a demo, we will always return true
public bool Validate(string userName, string passWord)
{
    _eventAggregator.PublishOnUIThread(new UserInfoModel(){UserName = userName});
    return true;
}
}

As you can see, we have used the PublishOnUIThread Method of Event Aggregator to publish the UserInfoModel object to anyone listening. We will now move to our ShellViewModel and ensure the class is listening. We would also see how we access the message send by the Login View Model.

Let’s begin by modifying the ShellViewModel to inject the EventAggregator to ShellViewModel, Our constructor now looks as following.

[ImportingConstructor]
public ShellViewModel(IWindowManager windowManager,ILogin loginWindow, IEventAggregator eventAggregator)
{
    eventAggregator.Subscribe(this);
    windowManager.ShowDialog(loginWindow);
}

Now comes the most important part, we need to implement the IHandle interface, where T is the model of the event object, which in our case is UserInfoModel. The interface contains a single method Handle, which we implement soon. Prior to that, this is how our class declaration looks now.

public class ShellViewModel : PropertyChangedBase, IShell, IHandle

The Handle Method of IHandle Interface looks like following.

public void Handle(UserInfoModel message)
{
this.UserName = message.UserName;
}

As you can see, we have assigned the UserName property of ViewModel from the message passed by the EventAggregator. That’s all we need to do. The EventAggregator has passed the message to all subscribers in the message bus for that particular event model. We then capture the message by implementing the Handle Method in IHandle interface.