We all heard that story before: The big monolith which brings so much business value and faithfully serves our customers is starting to slow us down. The product vision is evolving towards reactive features, which means reacting in real time to multiple domain events in the right context. The problem is that our monolith was designed as a classic CRUD system, running business logic synchronously when state change happens.
This article is the first of a series about how we introduced event sourcing and event-driven architecture to our customer support platform in a way that allowed gradual migration and now provides new business value without putting existing features at risk. While the traditional CRUD approach to system design focuses on state and how it is created, updated and deleted in a distributed environment by multiple users, the event sourcing approach focuses on domain events, when they happen and how they express business intent. The state, in the event sourcing approach, is a materialization from events, which is just one of many possible usages of the domain events.
A customer support platform is a great use case for reactive capabilities. As agents handle support cases coming from different channels, it is easy to lose track of high priority cases. An event-driven system can track each support case individually and help support agents stay focused on the right case, alerting when other cases require attention. That is just one of many examples. Another example is taking action when too many cases are open for a given category within a certain time period.
Wix Answers is a customer support solution that consolidates support tools like ticketing, help center, and call center into one intuitive platform with advanced built-in automations and analysis capabilities.
How would our system look if we could start over?
If we could start over, we would go with the Event Sourcing architecture. I won’t go too deep into what event sourcing architecture is, if you want to learn more I highly suggest you checkout this older article by Martin Fawler and also this newer one by Neha Narkhede.
What I love about event sourcing is that it puts domain events front and center. If you listen carefully to your customers explaining their requirements, you will often hear them say things like: “When this happens I want the system to do that”. They are actually speaking in terms of domain events. When, as developers, we understand that our primary goal is to produce domain events, things start to fall into place and we understand the power of event sourcing.
Before we go into what we actually did to make our Monolith reactive, I’ll try to describe what I see as the ideal solution if we could start over without any legacy code. I think that way you’ll be able to better understand the route we took and the compromises we had to make.
This is the general flow of events in an event sourcing architecture: Commands are requested by clients with the intent of mutating a state of some entity (uniquely identified by entity-id). The commands are processed by Aggregates, which are responsible for accepting or rejecting commands based on the current entity state. If a command is accepted, the aggregate publishes one or more domain events and also updates the current entity state. We have to assume that the aggregate has access to the most up-to-date entity state and that no other process is making decisions about a specific entity id in parallel; otherwise, we have to face state consistency issues that are inherent to every distributed system. From this follows that the Entity-current-state storage is our source of truth regarding the entity. All other representations of the entity will be eventually consistent, based on event materialization.
Kafka Streams as Event sourcing framework
There are great articles out there about how Kafka Streams can be used to implement event sourcing on top of Kafka. I think there is a lot more to say about this but that will have to wait for a separate article. For now I will just say that Kafka Streams makes it simple to write stateful transformation from a command topic to an event topic, using internal state store for the current entity state. The internal state store is a rocks-db database backed up by a Kafka topic. Kafka Streams provides all the guarantees you would expect from a database: your data is persisted, replicated and saved in a transactional way - in other words, your transformation will publish the events to the downstream topic only if the state was successfully saved in the internal state store and backed up to the internal Kafka Topic. That is guaranteed if exactly-once semantic is used. By relying on Kafka partitioning, we guarantee that a specific entity id will always be processed by a single process and that it will always have the most up-to-date entity state in the state store.
How did we introduce domain events into our Monolithic CRUD system?
The first thing we need to ask is what is the source of truth. Our monolithic system accepts mutation commands via REST API, updates an entity in MySQL, and then returns the updated entity to the caller.
That makes MySQL the source of truth. We cannot change that without introducing major changes to our monolith and the way it communicates with the client, which will have to become asynchronous. This will lead to major client-side changes.
Change Data Capture (CDC)
Streaming the database binlog to Kafka is a well known practice that is aimed at replicating databases. Every change to a table row is saved in the binlog as a record with the previous and current row states, effectively converting every table to a stream that can be materialized into the entity state in a consistent manner. We used the Debezium source connector to stream the binlog to Kafka.
Using Kafka Streams stateless transformation, we convert a CDC record into a command that is published to the aggregate command topic. We are doing that for several reasons:
There are many cases where we have multiple tables using the entity id as a secondary index. We would like our aggregate to handle all commands that are related to the same id. For example: you may have an “Order” table with a primary key, orderId, and an “OrderLine” table with orderId column. By converting the Order CDC record to UpdateOrderCdc command and OrderLine CDC record to UpdateOrderLineCdc command, we are making sure that the same aggregate will handle these commands and will have access to the most up-to-date entity state.
We want to define a schema for all aggregate commands. That schema may begin with CDC update commands, but may evolve into more fine-grained commands that could also be handled by the same aggregate, thus enabling gradual evolution towards a true event sourcing architecture.
As the aggregate processes commands, it gradually updates the entity state in Kafka. We may recreate the source connector and stream the same table again - however, our aggregate generates events based on the difference between the CDC data and the current entity state retrieved from Kafka. In a way, Kafka becomes the source of truth in terms of our streaming platform that lives side by side to the Monolith.
CDC records represent committed changes - why aren’t they Events?
The purpose of a CDC feed is to replicate the database in an eventually consistent way, not to generate domain events. It might be tempting to take a CDC record, which contains a before and after elements, and transform it onto a domain event by performing a diff operation between before and after. However, there are some major drawbacks in relying only on the CDC record.
When we perform a stateless transformation we can’t properly react to CDC records coming from different tables because there is no order guarantee between different tables. We may end up processing the OrderLine record before getting the Order record. A good domain event will provide some Order context as part of the OrderLine event. Having a stateful transformation allows us to use the aggregate state as storage for the OrderLine and only publish the OrderLine event when the Order data arrives. That is part of the aggregate’s responsibility as the source of events for an entity. Remember that we could not implement the pure architecture, but a side-by-side mode.
Introduce a Snapshot phase
The binlog will never contain the entire change history of all tables; for that reason, every new CDC connector we configure for a new table will begin with a Snapshot phase. The connector will mark the current position in the binlog, then perform a full table scan and stream the current state of all rows as a special CDC record marked with a snapshot flag. That inherently means that in every snapshot we lose domain event information. If the Order status has changed several times over time, the snapshot will only give us the latest status. That is because the goal of the binlog is to replicate state and not be a backbone of event sourcing. This is where the aggregate state store and the aggregate command-topic become critical. We want to design our solution in a way that a snapshot is only done once per table.
One of the powerful features of event sourcing is the ability to rebuild state or recreate the domain events by doing a playback of historical events or commands. Performing another snapshot is not the right solution here, as a snapshot will guarantee a loss of event information.
If we want to recreate our domain events, we need to reset the consumer offer of our command topic. The command topic packs the CDC records into commands and already stores the commands coming from different tables in the correct order (or an order which our aggregate knows how to handle).
We only touched upon the basic steps in the journey of making the Monolith reactive. We discussed how CDC can be used to build a command topic and why CDC records are not commands. Once we have a command topic, we use stateful transformation to create events and we can start enjoying the benefits of event sourcing: replaying commands to recreate events, reprocessing events to materialize state.
In the next articles we will discuss more advanced topics:
How Kafka Streams is used to express the event sourcing concepts of an aggregate.
How do we support one-to-many relationships.
How events are used to drive reactive applications by repartitioning events.
How do we reprocess command history to recreate events without downtime to reactive services that react to events.
And finally, how can we run stateful transformations in a multi-DC Kafka (hint: mirroring topics is really not enough).
References:
Martin Fawler, 2005 <https://martinfowler.com/eaaDev/EventSourcing.html>
Neha Narkhede, 2016 <https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection>
This post was written by Jonathan David
For more engineering updates and insights:
Join our Telegram channel
Visit us on GitHub
Subscribe to our YouTube channel
For those of you who cannot see the replies to your comments, there is currently a bug here. Use Ctl + A to select the entire page text, then you will see the "show more" button
Event Sourcing uses events as the source of truth for the aggregate state. In your implementation, events are not used to reconstruct the state. In fact, if the aggregate state update logic has a bug, you have no way to fix it other that replaying commands. So, to clean up the terminology, it is command sourcing, not event sourcing. In a system like this, events are only used as a medium to spread information downstream, which is not what event sourcing is about. Replaying commands also enforces new domain rules for handling commands, which might change with time. Unlike state reconstruction logic, which is based on event versions and cannot change (unless it is a bug fix). Therefore, by replaying…
Hi, nice transformation, it is often the case in large legacy projects that you just put down your arms and go with the flow without trying to put effort into a huge refactoring such as this. I have a question though, one issue from my experience that I faced was combining several event streams into a single read model (event source projection). You can't really rely on event sequence, since you have different aggregate ids, hence, different sequences. Did you have such a problem and how did you solve it? Maybe Kafka solves it? Also, there are more problems here, such as if we have non-sequential handle-at-least-once streams, then the handlers must be idempotent and be able to handle non-sequential…
Fowler (not Fawler)