Last time I covered strategies which can help you to handle conflicts when synchronizing event streams between the client and the server.
Today I'd like to show you the server side code required to apply these strategies.
The synchronization is exposed as a pair of pull and rebase operations on an MVC controller.
Pull
The pull operation is pretty simple, it allows the client to download events for a given Roster
instance, directly from the event source.
The events are encoded using the cloudevents specification before they are transferred to the client.
public class RosterSynchronization : Controller
{
private readonly EventSourcedRepository _repository;
private readonly IEventSource _eventSource;
public RosterSynchronization(
EventSourcedRepository repository,
IEventSource eventSource)
{
_repository = repository;
_eventSource = eventSource;
}
[HttpGet("api/rosters/{id}/events/pull")]
[Authorize]
public async Task<IActionResult> GetEvents(string id,
int version)
{
var pastEvents = await _eventSource.Load(
typeof(Domain.Roster).Name,
id,
version);
var encoded = pastEvents.EncodeAsCloudEvents(
out var contentType);
Response.ContentType = contentType.MediaType;
await Response.Body.WriteAsync(encoded);
return Ok();
}
Rebase
Out of the different strategies, I tend to prefer the rebase strategy.
This, because a rebase overrules any conflicting events originating from executing the cqrs api's by the changes made in the administrative apps, which are using client side event sourcing.
The rebase operation first decodes the received events from the request body, where they are shaped according to the cloudevents specification, into internal event types, inheriting from SourcedEvent
.
When there are such events, the code will instantiate the aggregate, using an instance of EventSourcedRepository
.
Subsequently it calls the Rebase
operation to invoke the rebasing logic.
As a result of rebasing, certain events may get new parent ids, so these events need to be synchronized back to the client.
The rebased events are encoded as cloud events again and returned to the client, which can overwrite or add them to its local event store.
[HttpPost("api/rosters/{id}/events/rebase")]
[Authorize]
public async Task<IActionResult> Rebase([FromRoute] string id)
{
var events = await Request.Body
.DecodeAsSourcedEventsAsync();
if (events == null) return BadRequest();
var aggregate = await _repository.Get<Domain.Roster>(id);
var rebased = aggregate.Rebase(events);
await _repository.Flush();
var encoded = rebased.EncodeAsCloudEvents(
out var contentType);
Response.ContentType = contentType.MediaType;
await Response.Body.WriteAsync(encoded);
return Ok();
}
}
Back to guide
This article is part of the building offline progressive web apps guide. Return to this guide to explore more aspects of building offline progressive web apps.