Command Handling - Code Example - Client

C

For the past few editions, I've mainly been focussing on architectural concepts which you can use when building offline progressive web applications.

In the upcoming editions I want to complete this series with a few more code examples showing implementations of those concepts.

Today, I'll start with how I implement command handling in an offline progressive web application based on client side event sourcing.

Handling commands is performed by the aggregate root pattern, which is my go to pattern for command to event transitions.

These transitions can be identified while event modeling.

Client side aggregate root

Roster Management

As an example I have chosen to use the roster management capability again, as it was also used in the cache aside article.

If you may recall from the online offline spectrum post, all of the proposed patterns can be combined to adhere to changes in this spectrum.

I think this concept will be easier to grok when showing example implementations from the same capability.

The command used in this example will be the 'Create' command, which creates a new roster.

A roster is a group of participants that may form a team over a certain period of time.

Rosters are typically created by the team coordination managers several months before the start of a basketball season.

For each future team they may experiment with multiple rosters from which they will ultimately select one, this closer to the start of the season.

After the start of the season, the roster may still change a bit, but more often than not it won't.

As rosters start their life as very lightweight experiments, nothing more than a name is known at creation time.

The aggregate root

An aggregate root is a class that exposes commands, as methods.

It validates invariants, takes decisions and emits those as business events when the decision is meaningfull.

In this case, the Roster class will, when its create command is invoked, first validate if it already has been created before by checking if it has a name.

If it doesn't, it will consider itself created and raise an appropriate event to signal this decission.

If it does it will not do anything.

Aggregate roots only emit meaningfull events!

Deciding to do nothing is not a meaningfull event, so the aggregate will not emit it as a consequence.

Therefore if a roster decides that it had been created before, it will return false instead of emitting an event.

import { EventSourced } from "../messagehandler.eventsourcing.eventsourced.js"
import authentication from "../clubmanagement.authentication.app.js";

class Roster extends EventSourced{

 constructor(id){
  super(id, 
    "https://contracts.clubmanagement.io/rosters/", 
    "io.clubmanagement.contracts.rosters");            
  }

  /* commands */

  create(name){

    if (this.name != null) return false;
    
    super.emit({
        context:
        {
            id: this.id,
            what: "RosterCreated",
            when: new Date(),
            who: authentication.identity.profile.id
        },
        rosterId: this.id,
        name: name
    });
    return true;
  }

  /* restore inner state */

  applyRosterCreated(evt){
      this.id = evt.data.rosterId;
      this.name = evt.data.name;
      this.participations = [];
  }

}

export { Roster }

To make decissions, an aggregate root usually needs its inner state, which can be rebuilt from the events it has emitted in the past.

To reconstruct the inner state, I use a convention by creating applyEventName methods where the EventName part is derived from the context.what property of the emitted event.

Invocation of these apply methods is abstracted into the EventSourced base class.

Note: The parameters passed into the constructor of the base class are related to the cloudevents specification which I use to store events and can be ignored for the sake of this article.

The form

Exposing commands to users is often done via a form.

This form consists of two parts.

First of all the markup, which is included in an html template in a file called create-roster.html.

The only meaningfull items on the markup are a required field for the roster name and the submit button.

<template id="create-roster-form-template">
 <form class="responsive-form">
  <fieldset>
    <legend>Add Roster</legend>			
    <table>
      <tr>
        <td>
          <label for="name">Name</label>
          <input type="text" id="roster-name" name="roster-name" required/>
        </td>
      </tr>
      <tr>
        <td>
          <label for="submit"></label>
          <submit-button>Submit</submit-button>
          <cancel-button>Cancel</cancel-button>
        </td>
      </tr>			
    </table>			
  </fieldset>
 </form>
</template>

The second part is the webcomponent logic which loads the template and binds itself to the form.

When the form is submitted, it will grab a new Roster from the repository and invoke Create on it.

If the invocation completed, it will persist the roster and call flush on the repository.

import templateLoader from "../../dish.shell.template.loader.js";
import app from "../../clubmanagement.roster.admin.app.js";
import guid from "../../dish.shell.guid.generator.js";
import queryString from "../../dish.shell.querystring.js";
import authorization from "../../clubmanagement.authorization.app.js";

class CreateRoster extends HTMLElement {

  constructor() {
    super();
  }

  async connectedCallback() {   
    
    this.innerHTML = "";
    const localFolder = import.meta.url.substring(0, import.meta.url.lastIndexOf("/"));
    const template = await templateLoader.load(localFolder + `/../templates/create-roster.html`, `create-roster-form-template`);
    this.append(template);

    var form = this.querySelector('form');

    form.addEventListener('submit', async (event) => {
      event.preventDefault();

      const name = form.querySelector("#roster-name").value;

      var rosterId = guid.generate();
      var roster = await app.repository.getRoster(rosterId);

      if(roster.create(name)){
        await app.repository.persistRoster(roster);      
        await app.repository.flush();
      }

      this.dispatchEvent(new Event('submitted'));

    }); 

  }
}

export { CreateRoster }

Infrastructure

For all but the first command the above code is the only things to be create.

The first time we need to create some infrastructure as well though.

More specifically the repository and the event store.

The repository

A repository is a class used to mediate between the aggregate root and the underlying event store.

On the client it typically has four methods.

  • populate: Will instruct the store to fill itself up by downloading events from the api.
  • getAggregate: Will get the events previously emitted by the aggregate root and call restoreFrom on a new instance of the root. This in turn will call the applyEventName methods on the root to restore its inner state.
  • persistAggregate: Will request the uncommitted events from the aggregate by calling commit and persist them in the event store as an undispatched event.
  • flush: Will instruct the store to send any undispatched events stored locally to the api.
import app from "../clubmanagement.roster.admin.app.js";

class RosterRepository{

    constructor(eventStore){
        this.eventStore = eventStore;
    }

    async populate(rosterId){
        await this.eventStore.populate(rosterId);
        app.topics.publish("roster.refreshed", rosterId);
    }

    async flush(){
        await this.eventStore.flush();
    }

    async getRoster(rosterId){   
        var events = await this.eventStore.get(rosterId);        			
        var roster = new Roster(rosterId);
        roster.restoreFrom(events);
        return roster;        
    }

    async persistRoster(roster){     
        var events = roster.commit();
        for (const e of events) {
            await this.eventStore.persistEvent(e, false);	
            
            app.topics.publish(e.data.context.what, e);
        }
        app.topics.publish("roster.persisted", roster.id);
    }  
    
}
export { RosterRepository }

Note: topics is an in memory message bus instance, scoped to the capability, to which web components can subscribe and lower level components, like the repository, can publish. This allows web components to rerender themselves whenever the roster got persisted or when the rosters events were refreshed from the api.

The event store

Storing events, in indexeddb in cloudevents format, can be done in a generic way. All this logic has been encapsulated in my MessageHandler frameworks EventStore class.

This store needs to be populated with events before it's usefull.

These events are to be pulled from the capabilities api, which will be a different one per capability.

Undispatched events will also need to be sent back to this api.

Depending on the capability different logic will be required to deal with conflicts that may occuring when replicating events back. In this case I've chosen for the rebase conflict resolution strategy.

All of this logic is encapsulated in a aggregate specific event store, in this case called RosterEventStore.

  • upgrade: Will set up the underlying indexeddb infrastructure.
  • populate: Will take care of pulling the events from the api and storing them locally.
  • get: Returns all the events stored for a specific aggregate instance.
  • persistEvent: Stores an event in the underlying indexeddb infrastructure.
  • flush: Will take care of replicating the undispatched events to the api and applying the rebase strategy. If the flush succeeds it will mark the events as dispatched. If not, e.g. when offline, a new flush will be required in the future.
import { Synchronization } from "../messagehandler.eventsourcing.sync.js";
import { EventStore } from "../messagehandler.eventsourcing.eventstore.js";
import guid from "../dish.shell.guid.generator.js";
import authentication from "../clubmanagement.authentication.app.js";
import config from "./config.js";

class RosterEventStore{

    constructor(db){
        this.uri = config.service + "/api/rosters/";
        this.eventStore = new EventStore(db, 'rosters');
        this.sync = new Synchronization(db);
        this.requestId = null;
        this.flushing = false;
    }

    upgrade(tx){    
        this.sync.upgrade(tx);
        this.eventStore.upgrade(tx);
    }

    async populate(rosterId, requestId){       

        if(this.requestId != null && this.requestId.startsWith(rosterId) && (requestId == null || this.requestId == rosterId + "-" + requestId)){
            return;
        }

        this.requestId = rosterId + "-" + (requestId != null ? requestId : guid.generate());

        var version = await this.sync.get(rosterId);
        if(!version) version = 0;
        
        var uri = this.uri + rosterId + "/events/pull";
        if(version != null){
            uri += "?version=" + version;
        }

        var response = await fetch(uri, {
            method: "GET",
            mode: 'cors',
            headers: {
              "Content-Type": "application/json",
              "Authorization": "Bearer " + authentication.getAccessToken()
            }       
          });
          if(response.status == 200){
            var events = await response.json();

            for (const e of events) {
                if(e.cmsequence > version) version = e.cmsequence;
                await this.eventStore.persistEvent(e, true);
            }

            await this.sync.persist(rosterId, version);
        }
        
    }

    async get(rosterId){   
        return await this.eventStore.get(rosterId);   
    }

    async persistEvent(e, dispatch){     
        await this.eventStore.persistEvent(e, dispatch);
    } 

    async flush(){
        if(this.flushing) return;
        this.flushing = true;

        var entities = await this.eventStore.undispatched(null);
        var toSync = Object.keys(entities);
        
        if(toSync && toSync.length > 0){

            for (const id of toSync) {

                var undispatched = entities[id];

                var uri = this.uri + id + "/events/rebase";

                var request = await fetch(uri, {
                    method: "POST",
                    mode: 'cors',
                    headers: {
                      "Content-Type": "application/cloudevents+json",
                      "Authorization": "Bearer " + authentication.getAccessToken()
                    },
                    body: JSON.stringify(undispatched)        
                  });
               if(request.status == 200){
                  for (const e of undispatched) {
                      await this.eventStore.persistEvent(e, true);		
                  }
               }
            }		            
        }

        this.flushing = false;
    }
}

export { RosterEventStore }

The populate method has a few performance optimizations built in.

  • It tracks requests to download events for a specific roster in the requestId property. Subsequent calls (within the scope of the current page request and store instance) will be ignored unless a different requestId gets passed into the populate method intentionally.
  • It will also only request events that occur later than the last known downloaded event. The version number for this last known downloaded event is tracked in a Synchronization table in indexeddb.

Startup

Finally we'll need to wire all these classes together and initialize them.

Similar to how it was done in the cache aside example, this will happen when the capability initializes.

import dbFactory from "./messagehandler.eventsourcing.indexeddb.factory.js";
import shell from "./dish.shell.js";
import config from "./roster.admin/config.js";
import { Topics } from "./dish.shell.pubsub.js";
import { RosterEventStore } from "./roster.admin/eventstore.js";
import { RosterRepository } from "./roster.admin/repository.js";
import { RosterAdminFeature } from "./clubmanagement.roster.admin.feature.js";

class RosterAdminApp{
    
    constructor(){
        this.name = "Roster Admin";
        this.topics = new Topics();
    }

    async initialize(){             

        var _db = await dbFactory.open(config.dbname, config.dbversion, (db, tx) => {
            new RosterEventStore(db).upgrade(tx);
        });
    
        this.eventStore = new RosterEventStore(_db);
        this.repository = new RosterRepository(this.eventStore);

        shell.features.push(new RosterAdminFeature());  
    }

}

var app = new RosterAdminApp();

shell.apps.push(app);

export default app;

Back to guide

This article is part of the building offline progressive web apps guide. Return to this guide to explore more aspects of building offline progressive web apps.

About the author

YVES GOELEVEN

I've been a software architect for over 20 years.

My main areas of expertise are large scale distributed systems, progressive web applications, event driven architecture, domain driven design, event sourcing, messaging, and the Microsoft Azure platform.

As I've transitioned into the second half of my career, I made it my personal goal to train the next generation of software architects.

Get in touch

Want to get better at software architecture?

Sign up to my newsletter and get regular advice to improve your architecture skills.

You can unsubscribe at any time by clicking the link in the footer of your emails. I use Mailchimp as my marketing platform. By clicking subscribe, you acknowledge that your information will be transferred to Mailchimp for processing. Learn more about Mailchimp's privacy practices here.