top of page

The Reactive Monolith - How to Move from CRUD to Event Sourcing


Photo by Alex Wong on Unsplash


We all heard that story before: The big monolith which brings so much business value and faithfully serves our customers is starting to slow us down. The product vision is evolving towards reactive features, which means reacting in real time to multiple domain events in the right context. The problem is that our monolith was designed as a classic CRUD system, running business logic synchronously when state change happens.


This article is the first of a series about how we introduced event sourcing and event-driven architecture to our customer support platform in a way that allowed gradual migration and now provides new business value without putting existing features at risk. While the traditional CRUD approach to system design focuses on state and how it is created, updated and deleted in a distributed environment by multiple users, the event sourcing approach focuses on domain events, when they happen and how they express business intent. The state, in the event sourcing approach, is a materialization from events, which is just one of many possible usages of the domain events.


A customer support platform is a great use case for reactive capabilities. As agents handle support cases coming from different channels, it is easy to lose track of high priority cases. An event-driven system can track each support case individually and help support agents stay focused on the right case, alerting when other cases require attention. That is just one of many examples. Another example is taking action when too many cases are open for a given category within a certain time period.


Wix Answers is a customer support solution that consolidates support tools like ticketing, help center, and call center into one intuitive platform with advanced built-in automations and analysis capabilities.



How would our system look if we could start over?


If we could start over, we would go with the Event Sourcing architecture. I won’t go too deep into what event sourcing architecture is, if you want to learn more I highly suggest you checkout this older article by Martin Fawler and also this newer one by Neha Narkhede.


What I love about event sourcing is that it puts domain events front and center. If you listen carefully to your customers explaining their requirements, you will often hear them say things like: “When this happens I want the system to do that”. They are actually speaking in terms of domain events. When, as developers, we understand that our primary goal is to produce domain events, things start to fall into place and we understand the power of event sourcing.


Before we go into what we actually did to make our Monolith reactive, I’ll try to describe what I see as the ideal solution if we could start over without any legacy code. I think that way you’ll be able to better understand the route we took and the compromises we had to make.



This is the general flow of events in an event sourcing architecture: Commands are requested by clients with the intent of mutating a state of some entity (uniquely identified by entity-id). The commands are processed by Aggregates, which are responsible for accepting or rejecting commands based on the current entity state. If a command is accepted, the aggregate publishes one or more domain events and also updates the current entity state. We have to assume that the aggregate has access to the most up-to-date entity state and that no other process is making decisions about a specific entity id in parallel; otherwise, we have to face state consistency issues that are inherent to every distributed system. From this follows that the Entity-current-state storage is our source of truth regarding the entity. All other representations of the entity will be eventually consistent, based on event materialization.



Kafka Streams as Event sourcing framework

There are great articles out there about how Kafka Streams can be used to implement event sourcing on top of Kafka. I think there is a lot more to say about this but that will have to wait for a separate article. For now I will just say that Kafka Streams makes it simple to write stateful transformation from a command topic to an event topic, using internal state store for the current entity state. The internal state store is a rocks-db database backed up by a Kafka topic. Kafka Streams provides all the guarantees you would expect from a database: your data is persisted, replicated and saved in a transactional way - in other words, your transformation will publish the events to the downstream topic only if the state was successfully saved in the internal state store and backed up to the internal Kafka Topic. That is guaranteed if exactly-once semantic is used. By relying on Kafka partitioning, we guarantee that a specific entity id will always be processed by a single process and that it will always have the most up-to-date entity state in the state store.



How did we introduce domain events into our Monolithic CRUD system?


The first thing we need to ask is what is the source of truth. Our monolithic system accepts mutation commands via REST API, updates an entity in MySQL, and then returns the updated entity to the caller.



That makes MySQL the source of truth. We cannot change that without introducing major changes to our monolith and the way it communicates with the client, which will have to become asynchronous. This will lead to major client-side changes.



Change Data Capture (CDC)


Streaming the database binlog to Kafka is a well known practice that is aimed at replicating databases. Every change to a table row is saved in the binlog as a record with the previous and current row states, effectively converting every table to a stream that can be materialized into the entity state in a consistent manner. We used the Debezium source connector to stream the binlog to Kafka.


Using Kafka Streams stateless transformation, we convert a CDC record into a command that is published to the aggregate command topic. We are doing that for several reasons:

  • There are many cases where we have multiple tables using the entity id as a secondary index. We would like our aggregate to handle all commands that are related to the same id. For example: you may have an “Order” table with a primary key, orderId, and an “OrderLine” table with orderId column. By converting the Order CDC record to UpdateOrderCdc command and OrderLine CDC record to UpdateOrderLineCdc command, we are making sure that the same aggregate will handle these commands and will have access to the most up-to-date entity state.

  • We want to define a schema for all aggregate commands. That schema may begin with CDC update commands, but may evolve into more fine-grained commands that could also be handled by the same aggregate, thus enabling gradual evolution towards a true event sourcing architecture.

As the aggregate processes commands, it gradually updates the entity state in Kafka. We may recreate the source connector and stream the same table again - however, our aggregate generates events based on the difference between the CDC data and the current entity state retrieved from Kafka. In a way, Kafka becomes the source of truth in terms of our streaming platform that lives side by side to the Monolith.



CDC records represent committed changes - why aren’t they Events?


The purpose of a CDC feed is to replicate the database in an eventually consistent way, not to generate domain events. It might be tempting to take a CDC record, which contains a before and after elements, and transform it onto a domain event by performing a diff operation between before and after. However, there are some major drawbacks in relying only on the CDC record.


When we perform a stateless transformation we can’t properly react to CDC records coming from different tables because there is no order guarantee between different tables. We may end up processing the OrderLine record before getting the Order record. A good domain event will provide some Order context as part of the OrderLine event. Having a stateful transformation allows us to use the aggregate state as storage for the OrderLine and only publish the OrderLine event when the Order data arrives. That is part of the aggregate’s responsibility as the source of events for an entity. Remember that we could not implement the pure architecture, but a side-by-side mode.



Introduce a Snapshot phase


The binlog will never contain the entire change history of all tables; for that reason, every new CDC connector we configure for a new table will begin with a Snapshot phase. The connector will mark the current position in the binlog, then perform a full table scan and stream the current state of all rows as a special CDC record marked with a snapshot flag. That inherently means that in every snapshot we lose domain event information. If the Order status has changed several times over time, the snapshot will only give us the latest status. That is because the goal of the binlog is to replicate state and not be a backbone of event sourcing. This is where the aggregate state store and the aggregate command-topic become critical. We want to design our solution in a way that a snapshot is only done once per table.


One of the powerful features of event sourcing is the ability to rebuild state or recreate the domain events by doing a playback of historical events or commands. Performing another snapshot is not the right solution here, as a snapshot will guarantee a loss of event information.

If we want to recreate our domain events, we need to reset the consumer offer of our command topic. The command topic packs the CDC records into commands and already stores the commands coming from different tables in the correct order (or an order which our aggregate knows how to handle).


We only touched upon the basic steps in the journey of making the Monolith reactive. We discussed how CDC can be used to build a command topic and why CDC records are not commands. Once we have a command topic, we use stateful transformation to create events and we can start enjoying the benefits of event sourcing: replaying commands to recreate events, reprocessing events to materialize state.


In the next articles we will discuss more advanced topics:

  • How Kafka Streams is used to express the event sourcing concepts of an aggregate.

  • How do we support one-to-many relationships.

  • How events are used to drive reactive applications by repartitioning events.

  • How do we reprocess command history to recreate events without downtime to reactive services that react to events.

  • And finally, how can we run stateful transformations in a multi-DC Kafka (hint: mirroring topics is really not enough).


 

References:



 


This post was written by Jonathan David


 

For more engineering updates and insights:

16 Comments


Jonathan David
Jonathan David
Oct 01, 2021

For those of you who cannot see the replies to your comments, there is currently a bug here. Use Ctl + A to select the entire page text, then you will see the "show more" button


Like
levin zhang
levin zhang
Oct 07, 2021
Replying to

Hey David, I am the editor of InfoQ China. We like your article and want to translate it into Chinese. Before publish it on our website, I want ask for your permission first. We will add original link and author information in the Chinese version. If you have any other question or concern, please let me know. Thank you!

Like

Alexey Zimarev
Alexey Zimarev
Sep 30, 2021

Event Sourcing uses events as the source of truth for the aggregate state. In your implementation, events are not used to reconstruct the state. In fact, if the aggregate state update logic has a bug, you have no way to fix it other that replaying commands. So, to clean up the terminology, it is command sourcing, not event sourcing. In a system like this, events are only used as a medium to spread information downstream, which is not what event sourcing is about. Replaying commands also enforces new domain rules for handling commands, which might change with time. Unlike state reconstruction logic, which is based on event versions and cannot change (unless it is a bug fix). Therefore, by replaying…

Like
Jonathan David
Jonathan David
Oct 01, 2021
Replying to

We describe a third alternative:


use the event to materialize the next aggregate state and then immediately store that state in the state store.

every aggregate state is always based only on events. the fact that we are using a cached state, is a performance improvement required for efficient single message transform.

the other app that materializes the events into the state store is only done by stand-by app mode, designed to prepare a second Kafka cluster for cases of failover.

Materializing the events into the internal state store is another way of demonstrating how events are the source of truth for the aggregate state.


Your 2nd meaning of source of truth, that introduces delay, is not something that we…


Like

Linas Bitkevičius
Linas Bitkevičius
Sep 30, 2021

Hi, nice transformation, it is often the case in large legacy projects that you just put down your arms and go with the flow without trying to put effort into a huge refactoring such as this. I have a question though, one issue from my experience that I faced was combining several event streams into a single read model (event source projection). You can't really rely on event sequence, since you have different aggregate ids, hence, different sequences. Did you have such a problem and how did you solve it? Maybe Kafka solves it? Also, there are more problems here, such as if we have non-sequential handle-at-least-once streams, then the handlers must be idempotent and be able to handle non-sequential…

Like
Jonathan David
Jonathan David
Oct 02, 2021
Replying to

If the business cannot tolerate eventual consistency, then I think we should not be using a projection but rather define an aggregate that holds the required data in order to make a decision.

Instead of joining event streams, you think about the transformation sequence of a command before it is approved.


The smaller an aggregate is, the less it can make decisions about commands without requiring external state.

When working with Kafka Streams, you think in terms of stateful transformations that only has access to local state stores.

There are also valid use cases of global tables that can provide additional state that an aggregate can use to make a business decision, however, this is usually a slowly changing configuration…


Like

C. Enrique Ortiz
C. Enrique Ortiz
Sep 30, 2021

Fowler (not Fawler)

Like
bottom of page