CQRS/ES #4 Events and Event Store -

CQRS/ES #4 Events and Event Store

So far we learned about the whole concept of Command Query Responsibility Segregation (CQRS) and Event Sourcing. We also implemented domain objects which Aggregate consists of, and we expressed user’s intentions using Commands executed by Command Handlers. Today we’ll discover the role of events, and more importantly, we’ll introduce Event Sourcing to our Awesome Calendar project. One more thing. As I mentioned in one of the previous parts, I’m currently working on this project wich means that some code changes might appear in the future. Therefore I’ll inform you at the beginning of each part (starting today) about “breaking changes” just no to confuse you too much.


Breaking changes alert

  • Handle method in ICommandHandle interface is now asynchronous. Therefore, the implementation of Handle method in the CreateCalendarItemCommand is also asynchronous.



In general, the event represents some action that took place in the past in our app. It might be a property change (remember an INotifyPropertyChanged interface in WPF?), database insertion or operation failure. In short, comparing to the command object, it describes something that happened in our system (not that should be performed), and it’s generated by our application (not the user). The key question is whether the introduction of events in our CQRS application is equivalent to the introduction of the ES? The answer is…no. As described in the first part, the purpose of the ES is to reconstruct domain objects using stored events, but events themselves are only “data storage”, like any other object. In fact, events are part of CQRS pattern and we will use them as a mechanism to synchronize our two databases (Event Store and Read-Side database). I think now we are ready to code something. First, we need to implement an interface for our events:


public interface IEvent
     Guid AggregateId { get; }
     DateTime CreatedDate { get; }


It contains only two properties. AggregateId will be used for identifying which aggregate generated a particular event. CreatedDate will help us to apply events inside the aggregate in the correct order (starting from the oldest). Moving forward, let’s create a base class for the events generated by CalendarItem aggregate:


public abstract class CalendarItemBaseEvent : IEvent
    public Guid Id { get; set; }

    public Guid AggregateId { get; set; }

    public DateTime CreatedDate { get; }    

    protected CalendarItemBaseEvent()
        Id = Guid.NewGuid();
        CreatedDate = DateTime.UtcNow;


It’s an abstract class, so we won’t be able to instantiate it. As you probably noticed, CalendarItemBaseEvent contains an additional property called Id. That’s because later we’ll use our events as entities in a relational database which require a primary key. From the point of view of a domain, Id property is unnecessary. Having a base class, now we can finally create particular events for CalendarItem creation:


public class CalendarItemCreatedEvent : CalendarItemBaseEvent
    public string UserId { get; set; }

    public string Name { get; set; }

    public string Description { get; set; }

    public DateTime StartDate { get; set; }

    public DateTime EndDate { get; set; }

public class CalendarItemCycleCreatedEvent : CalendarItemBaseEvent
    public DateTime StartDate { get; set; }

    public DateTime? EndDate { get; set; }

    public CalendarItemCycleType Type { get; set; }

    public int Interval { get; set; }


Implementing missing parts in the Aggregate

Knowing what the events are, we can go back to the code from the part 2 when we skipped the implementation of ApplyChange method in the AggregateRoot class. Before we get to the code, let’s analyze what this method should do. After receiving the event, we should somehow update the selected property of the aggregate. Since the AggreagteRoot class is supposed to be universal, we cannot attempt of “if” series , neither pattern matching. We need, to make it more cleverly. Therefore, we will add to our solution this interface:


public interface IHandle<in TEvent> where TEvent : class, IEvent
    void Handle(TEvent @event);


Now, here is the trick. Each domain object will implement several IHandle<T> interfaces for each event which may be generated within the aggregate. Therefore domain object will contain several Handle methods inside in which we’ll describe (using the code of course) what properties should be updated after applying the particular event. Here is the implementation for CalendarItemCreatedEvent:


void IHandle<CalendarItemCreatedEvent>.Handle(CalendarItemCreatedEvent @event)
    Id = @event.AggregateId;
    UserId = @event.UserId;
    Name = @event.Name;
    Description = @event.Description;
    StartDate = @event.StartDate;
    EndDate = @event.EndDate;


As you can see, a method is declared explicitly. That’s because I wanted to hide it (of course still it is possible to call this method outside the CalendarItem, but we won’t do it by mistake) but also leave it in the interface at the same time. The Handle method only consists of property assignments. Now we are ready to move on! Here’s how our ApplyChange method is going to look like:


protected void ApplyChange(IEvent @event, bool isNew = true)
    var handleType = typeof(IHandle<>).MakeGenericType(@event.GetType());

    var eventType = @event.GetType();

    handleType.GetMethod(nameof(IHandle<IEvent>.Handle), new[] { eventType })
        .Invoke(this, new object[] { @event });

    if(isNew) Events.Add(@event);


The code itself is nothing more but calling proper Handle method for received event. For this purpose, we need to use reflection. Okay, we got this. Now we can finally answer the first question with which I left you in the previous section:


What happens inside the CalendarItem after its creation?


The answer is now evident (at least I hope so). What we need to do is to create proper events in the constructor and apply them using the ApplyChange method. The implementation is given below:


public class CalendarItem : AggregateRoot, 
    public string UserId { get; private set; }

    public string Name { get; private set; }

    public string Description { get; private set; }

    public DateTime StartDate { get; private set; }

    public DateTime EndDate { get; private set; }
    public List<CalendarItemCycle> Cycles { get; private set; } = new List<CalendarItemCycle>();

    public CalendarItem(Guid id, string userId, string name, string description, 
        DateTime startDate, DateTime endDate, IEnumerable<Contracts.Commands.CalendarItemCycle> cycles)
        ApplyChange(new CalendarItemCreatedEvent
            AggregateId = id,
            UserId = userId,
            Name = name,
            Description = description,
            StartDate = startDate,
            EndDate = endDate

        if (cycles == null) return;

        foreach (var cycle in cycles)
            ApplyChange(new CalendarItemCycleCreatedEvent
                AggregateId = id,
                EndDate = cycle.EndDate,
                Interval = cycle.Interval,
                Type = cycle.Type,
                StartDate = cycle.StartDate

    public CalendarItem()

    //Handle methods


If it doesn’t make any sense to you, look at the diagram below which presents it simply:





Event Store

Our next step is to create a database for storing the events – Event Store. In order to do that, we are going to use Entity Framework combined with Mircosoft SQL Server 2014 just no to complicate our objective too much. If you prefer using NoSQL databases or other ORM such as NHibernate or SimpleData, feel free to do that. After EF installation, we can begin creating our entities. Before that happens, let me introduce the structure of our Event Store:




We’ll represent this inheritance using TPH – Table per hierarchy. In short, TPH allows us to create only one table in the database per hierarchy, no matter how “deep” it is. If you are not familiar with this approach, stop reading now and read this article. Having our entities (events) we can define database context:


public class EventStoreContext : DbContext
    public DbSet<CalendarItemBaseEvent> CalendarItemEvents { get; set; }

    static EventStoreContext()
        Database.SetInitializer(new DropCreateDatabaseIfModelChanges<EventStoreContext>());

    public EventStoreContext()

    protected override void OnModelCreating(DbModelBuilder modelBuilder)
        modelBuilder.Entity<CalendarItemBaseEvent>().HasKey(e => e.Id);


Nothing special. We registered only parent class to create one, big table with discriminator. We also marked Id property as a primary key for the table. We could do the same using Key attribute, but I just wanted to keep CalendarItemBaseEvent clear. Now it’s time to answer the second question from the previous part of series:


How does the Persist method save an Aggregate’s state into Event Store?


The answer looks as follows:


public class EventStore : IEventStore
    EventStoreContext Context { get; }
    IEventBus EventBus { get; }

    public EventStore(EventStoreContext context, IEventBus eventBus)
        Context = context;
        EventBus = eventBus;

    public async Task PersistAsync<TAggregate>(TAggregate aggregate) where TAggregate : class, IAggregateRoot
        var events = aggregate.GetUncommittedEvents();

        foreach (var @event in events)
            var eventType = @event.GetType();

            var genericSetMethod = Context.GetType().GetMethods().First(m => m.Name == nameof(Context.Set) && m.IsGenericMethod);

            var dbSet = genericSetMethod.MakeGenericMethod(eventType).Invoke(Context, null);

            dbSet.GetType().GetMethod("Add").Invoke(dbSet, new[] { @event });

        await Context.SaveChangesAsync();

        foreach (var @event in events)
            await EventBus.SendAsync(@event);
    public async Task<TAggregate> GetByIdAsync<TAggregate,TEvent>(Guid id) 
        where TAggregate : IAggregateRoot, new()
        where TEvent : class, IEvent
        var events = await Context.Set<TEvent>().Where(e => e.AggregateId == id).AsNoTracking().OrderBy(e => e.CreatedDate).ToListAsync();

            throw new AwesomeCalendarException(AwesomeCalendarExceptionType.AggregateNotFound, typeof(TAggregate));

        var aggragate = new TAggregate();

        return aggragate;


Let’s start with PersistAsync method which looks kinda messy. That’s because we had to use a reflection once again (remember that Events collection in the AggregateRoot class keeps events as IEvent objects). However, if you take a close look at the code, you can notice that’s it’s nothing more but typical EF insertion using the DbSet<TEntity> object. After all, we commit changes by calling the SaveChangesAsync method, and we send each event to the EventBus. The GetByIdAsync method is even simpler. First, we need to get all aggregate’s events from the chosen table (in our case it will be CalendarItemBaseEvent). Then we create a new instance of a TAggregate object. To reconstructs its state we pass all gotten events to LoadFromHistory method. That’s it! The whole mystery has been revealed 😉



In conclusion, let’s point out the current flow of our application:

  • CreateCalendarItemCommandHandler receives the CreateCalendarItemCommand
  • CreateCalendarItemCommandHandler creates new instance of the CalendarItem object (passes all Command properties in a constructor)
  • CreateCalendarItemCommandHandler creates CalendarItemCreatedEvent and CalendarItemCycleCreatedEvent for each cycle given in a command object, then applies them using the AppyChange method
  • ApplyChange method adds received event to the Events collection and invokes proper Handle method
  • Handle method assigns properties based on the received event
  • CreateCalendarItemCommandHandler persists CalendarItem’s state using the PersistAsync method in the EventStore
  • PersistAsync method gets all uncommitted events from CalendarItem object using GetUncommitedEvents method
  • PersistAsync method saves each event in the database
  • PersistAsync  method sends each event to the EventBus


Well, I hope that it’s clear to you. If not, feel free to ask me in the comments below 🙂 In the next episode of our journey, we’ll implement CommandBus and EventBus using RabbitMQ and EasyNetQ library. Remeber that the whole project is available for you on Github (if you’d like to contribute, go ahead). I also encourage you to follow me on Twitter or Facebook just not to miss new posts on my blog!

You may also like...