Projections - Code Example - Server

P

So far I've covered how to do command handling in a CQRS oriented api.

In addition I've covered how such an API can be extended to allow offline clients, which process the same commands locally, to synchronize the emitted events back to the API and handle conflicts that can occur in such a scenario.

However there is a third aspect to discuss: projecting the resulting stream of events to a state model and exposing that model through the API.

On the read side of a CQRS oriented APi, I'm using the same patterns on the server as I do on the client: a projection, augmented by a registry for easy lookup.

Projection

My server side projections look slightly different then they did on the client though.

Instead of projecting towards individual Roster instances, I'm logically projecting towards the registry instead.

On the client, my registry was used in combination with the cache aside pattern which downloads state objects from the api, which are later updated by projecting after a command has executed.

That is not the case on the server, there all I have to work with are the events. So both the initial population of the registry as well as updating the state objects after command execution is done in the same way. Using the indexes from the registry as lookup mechanism for these projections is very convenient from a performance perspective.

On top of that, there is a lot more opportunity for concurrently running projections, when multiple roster aggregates are being worked on at the same time through api calls. Hence I find it useful to encapsulate such logic inside the registry.

public class ProjectToRegistry:
  IProjection<RostersRegistry, RosterCreated>,
  IProjection<RostersRegistry, RosterRenamed>
{
  public void Project(RostersRegistry registry, RosterCreated msg)
  {
    var roster = registry.Get(msg.RosterId) ?? new Api.Contract.Roster() { RosterId = msg.RosterId };

    roster.Name = msg.Name;

    registry.Index(roster);
  }

  public void Project(RostersRegistry registry, RosterRenamed msg)
  {
      var roster = registry.Get(msg.RosterId);

      roster.Name = msg.Name;

      registry.Index(roster);
  }
}

Registry

Most of my registry implementations are usually nothing more than a wrapper around ConcurrentDictionary instances that hold references to the objects by a given index.

In the Index method I'll add or update the entity in the indexes.

In the Get methods I'll return the entities matching the correct index and index value.

public class RostersRegistry
{
    private readonly ConcurrentDictionary<string, Roster> _rosters = new ConcurrentDictionary<string, Roster>();
    private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, Roster>> _rostersByOrganizationId = new ConcurrentDictionary<string, ConcurrentDictionary<string, Roster>>();

    public void Index(Roster roster)
    {
        _rosters.AddOrUpdate(roster.RosterId, roster, (id, r) => roster);

        if (!string.IsNullOrEmpty(roster.OrganizationId))
        {
            var byOrganization = _rostersByOrganizationId.GetOrAdd(roster.OrganizationId, i => new ConcurrentDictionary<string, Roster>());

            byOrganization.AddOrUpdate(roster.OrganizationId, roster, (id, r) => roster);
        }
    }

    public Roster Get(string id)
    {
        _rosters.TryGetValue(id, out var roster);
        return roster;
    }

    public IList<Roster> GetByOrganization(string organizationId)
    {
        if (_rostersByOrganizationId.TryGetValue(organizationId, out var rosters))
        {
            return rosters.Values.ToList();
        }
        else
        {
            return new List<Roster>();
        }
    }

    public IList<Roster> All()
    {
        return _rosters.Values.Select(p => p).ToList();
    }    
}

On the client I would sometimes augment indexing by full text search as well.

I tend not to do that on the api side as the total data set is typically much larger.

If I'd need such features I'd store the projected data into an appropriate database, such as Azure Search, instead.

The same goes for data set that might grow extremely large, holding millions of roster objects in memory is not a great idea.

Restore on startup

I'm populating the registry on API startup, using an implementation of an IHostedService.

This will allow the API to service requests immediatly and does not block requests until the registry is fully loaded.

public class Restore : IHostedService
{
    private readonly RostersRegistry _registry;
    private readonly IExecuteProjections _projection;

    public Restore(IExecuteProjections projection, RostersRegistry registry)
    {
        _registry = registry;
        _projection = projection;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _projection.Restore("Roster", id => _registry);
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }
}

Maintain Session Affinity

When a client makes a get request, shortly after executing a command, it expects the projected model to reflect the changes made as a result of the command.

Other clients usually care less about the exact state and will probably be happy with an eventually consistent model.

This combined concept is called Session Affinity.

To ensure that the projected model gets updated after the events have been flushed from the repository I've implemented an in memory message channel that immediatly applies the projection, on the same api instance, for each of outbound message.

This way the next api request, which likely ends up on the same api instance due to affinity, will receive the latest projected model.

public class MaintainSessionAffinity : MessageHandler.EventSourcing.DomainModel.IChannel
{    
    private RostersRegistry _registry;
    private IExecuteProjections _projection;

    public MaintainSessionAffinity(RostersRegistry registry, IExecuteProjections projection)
    {
        _registry = registry;
        _projection = projection;
    }

    public async Task Push(IEnumerable<SourcedEvent> events)
    {            
        foreach(var e in events)
        {                
            projection.Apply(_registry, e);
        }
    }
}

API

Implementing the API itself now became very straight forward.

Call the correct Get method on the registry to return the requested data straight from memory.

[Route("api/[controller]")]
public class RostersController : Controller
{
    private RostersRegistry _registry;

    public RostersController(RostersRegistry registry)
    {
        _registry = registry;
    }

    [HttpGet("organizations/{orgid}")]
    public async Task<IActionResult> GetByOrganization(string orgid)
    {
        var all = _registry.GetByOrganization(orgid);           
        return Ok(all);
    }
}

Configuration

Wiring up these additional types requires to extend the API's configuration code a little bit:

  • Adding the projection and the message channel to the event sourcing configuration
  • Putting the registry and the restore logic into aspnet's service collection for resolution.
public static IServiceCollection AddEventSource(this IServiceCollection services)
{
    var connectionString = Environment.GetEnvironmentVariable("CUSTOMCONNSTR_azurestoragedata");

    var configuration = new EventsourcingConfiguration();
    var eventSource = new AzureTableStorageEventSource(connectionString, "Rosters");
    configuration.UseContainer(services);
    configuration.UseEventSource(eventSource);

    configuration.EnableProjections(typeof(ProjectToRegistry));
    configuration.UseChannel<MaintainSessionAffinity>();

    services.AddSingleton<RostersRegistry>();
    services.AddSingleton<IHostedService, Restore>();       

    return services;
}

Wrapup

That's all there is to creating an API inside the ClubManagement system, command handling with conflict detection and projection logic.

Everything else, sending emails or notifications, keeping databases up to date, etc... is all considered a side effect that can be handled by other processes in response to events emitted by an API.

As these aspects are not crucial for understanding how the API interacts with offline progressive web apps, I will cover these aspects at some later point.

Back to guide

This article is part of the building offline progressive web apps guide. Return to this guide to explore more aspects of building offline progressive web apps.

About the author

YVES GOELEVEN

I've been a software architect for over 20 years.

My main areas of expertise are large scale distributed systems, progressive web applications, event driven architecture, domain driven design, event sourcing, messaging, and the Microsoft Azure platform.

As I've transitioned into the second half of my career, I made it my personal goal to train the next generation of software architects.

Get in touch

Want to get better at software architecture?

Sign up to my newsletter and get regular advice to improve your architecture skills.

You can unsubscribe at any time by clicking the link in the footer of your emails. I use Mailchimp as my marketing platform. By clicking subscribe, you acknowledge that your information will be transferred to Mailchimp for processing. Learn more about Mailchimp's privacy practices here.