Creating read-optimized views is essential for scaling systems. However, transitioning to a stable, denormalized view without causing system downtime is not at all trivial.
This is precisely the challenge we recently faced with Wix Inbox. In this blog post I'll share how we were able to overcome it and some lessons we learned along the way to hopefully make your future transitions easier.
Wix Inbox is where Wix users communicate with their site’s visitors and contacts.
We have tens of thousands of daily active Wix users, and terabytes of stored messages across billions of past and ongoing conversations.
Our domain consists of some basic entities:
A conversation, with messages, between 2 participants (the site and the site visitor), as well as the display data that contains the name and image to show in each conversation.
Each of the above entities marked in bold has its own table in our mySQL database.
This is a normalized data model - which was the initial state of our system.
Our mission was to enable any user to get a list of all of their unread conversations. To achieve that, a heavy query had to be executed. The query would join the conversations, the participants and the users tables to find which conversations were updated since the last time that user had read them.
Smells like something’s wrong
As traffic grew, more and more queries like that happened. We started seeing more and more slow-running queries. Especially for busy sites with lots of conversations. Luckily, mySQL allowed us to log and monitor this type of behavior so we were able to analyze it pretty quickly.
Exacerbating the problem, even if the application server got to the connection timeout and closed it, slow queries kept running on the mySQL server and added up to each other. This naturally destabilized our mySQL instance.
Denormalization to the rescue!
We realized that any optimization would have limited effect and still couldn’t support our growth. We needed to find another solution in order to scale better.
Writing to normalized tables was not the real problem. We also had a lot of simple access patterns (queries) that accessed data without any issue.
The problem was this specific access pattern - retrieving unread conversations for a given user. Meaning, we needed a dedicated view that was optimized for this case exactly.
This meant that we had to split the read and write sources, and create a read-optimized view.
Here’s the plan:
Writes go to the mySQL databases as before.
For each write, an event should be produced, representing the change that has been made.
The consumer (projection engine) of the event stream maintains and updates the read-optimized view, which is basically a projection of the data that fits the read pattern.
The access pattern at issue will be directed to the projection rather than the mySQL table.
As this view is optimized for this specific access pattern, from a query complexity standpoint, we’re going for constant time (assuming the result set’s size is bounded) as opposed to super-linear complexity in mySQL, thereby reducing load and achieving scalability.
The picture above is rather simplistic. In practice this is way more complex and introduces many challenges.
Now, while implementing this architecture is difficult enough in a new system, we have the additional challenge of making this change in a live production system.
Here are 5 of the main challenges we’ve tackled along the way:
Challenge #1 - Getting business events out of the system
Let’s take a basic send message scenario, which consists of the following actions:
Writing the message to the messages DB.
Updating the conversation’s last message time - for it to pop up to the head of the conversation’s list.
If the conversation was archived, then unarchive it.
Make sure this conversation is marked as read for the user that sent the message.
Each of the above 4 db writes are business events that are highly relevant to our projection table. Also, each event can happen on its own and not in the context of message sending. Meaning, we need to have an event fired for each of them.
2 Options to do so:
Change Data Capture (CDC) - is a software process that identifies and tracks changes to data in a database (usually close to realtime). Meaning, that as a new database transaction happens, an event is fired.
Emitting events from the application server - right after writing to the database.
Basically, the recommended way to do it is to use CDC. Why? It helps avoid a lot of headache, and a bunch of other problems the 2nd approach introduces:
Consistency - A database write may succeed but producing the event might fail. Meaning, we will have inconsistency between our source of truth (the db) and the projection. CDC reduces this risk, having events fired from within the DB write process (for instance after writing to the mySQL commit log).
Event Ordering - A distributed system, meaning a number of instances of the same application receive concurrent requests. Each instance emits events to the same topic, which might cause events in the stream to be out of order. CDC happens from the source DB which tracks changes in transaction log in order of their occurrence.
Maintainability - Imagine a new developer joining the team and adding another write as a side effect to some business transaction. If they forget to produce an event - It will probably break the projection integrity.
Dev time - One time setup of CDC on the mySQL cluster vs emitting an event after every write in the code (as well as in every new write added in the future).
Challenge #2 - Building a Projection:
Now that we have the events, we need to build the projection and maintain it.
Let’s build the data model by examining how we plan to access the data, and what data should be fetched.
Deciding upon the key:
The primary key of the projection is derived from how we query it (read from it). As the projection is optimized for reads, this is one of the most important decisions.
Our access pattern requests the unread conversations for a user in a specific site. Therefore the chosen key was the user_id and the site_id.
Why is this important at this stage? Because this key will also be used for writes and updates in the consumer that consumes the events - meaning, each event has to contain these attributes.
The value / attributes:
The data that needs to be fetched is a list of conversation ids, sorted by the last time they were updated (in descending order).
So.. this means that every event also needs to contain these two attributes.
We want to avoid querying the mySQL for more data upon each, or some, of the events. This will introduce more load on the mySQL, which completely misses the point.
Challenge #3 - Populating the new projection in a production system
So we know what we want our projection to look like. But we have a live production system, and we need to have a solid plan for populating this read-optimized view.
Meaning, we have to transform the source of the unread conversations query - from the heavy mySQL query - to our new lightweight projection, and make sure the right data is there.
Option 1: Lazy - for each user that requests the unread conversations endpoint, we return an answer from mySQL while also populating the projection with a snapshot of the data. From this point on, a user will be served from the read-optimized view.
Option 2: Eager migration - Iterate over all users and sites in our system, and for each, populate the read-optimized view with a snapshot from mySQL.
An eager migration seems heavy and also wasteful. Why migrate and keep track of a read-optimized view for users that might potentially never interact with our endpoint?
We chose the first option - of populating the projection lazily.
That means that we have to keep the old (normalized) view forever (or until we decide to eagerly migrate the inactive conversations) and keep paying the old query penalty. Thus success here will be measured by the hit ratio, meaning what percent of traffic goes to the projection rather than to the mySQL normalized view (you’ll have to read until the end to find that out).
So how is it going to work?
Client asks for unread conversations.
We check for them in the projection table. Then for each:
If it exists - it is returned to the client.
If it doesn’t exist -
We return it from mySQL
Then, we emit an event to trigger the creation of the view, i.e. take a snapshot from mySQL, and populate the projection.
From that point on, the projection will be maintained by the projection engine and the events (more on that in the last section).
Challenge #4 - Fault Tolerance:
So what can go wrong, and how can we deal with it? We’re maintaining a projection for the unread conversations a certain user has in a specific site. Consider this simple scenario:
A visitor sends a message to a site.
The site owner is online and sees the message.
There are two events here, one happens on MessageSent - marking the conversation as unread. The other is the ConversationRead event (by the user) - marking this conversation as read.
Now, if the ConversationRead event was processed before the MessageSent event - we would have a corrupted view.
We use Apache Kafka for our events. So without getting into Kafka internals, we made sure that events of the same site will always be handled by the same consumer to avoid any race condition of this kind.
What would happen if for some reason, the first event fails to update the projection, and the second succeeds? Again, we’ll get a corrupted view.
Couple of options here:
A consumer with a blocking retry policy, meaning it will retry handling a failed event until success, while blocking other events from being processed.
Upon failure, put the failed message in a separate retry topic, handle it there, and in the meanwhile keep streaming and handling the next messages in the queue.
Yes, option one ensures the handling of events in order. But upon failure(s) will delay every other message in the queue and potentially affect other sites and views, delaying their update.
Option 2, on the other hand, will keep streaming next events, but might handle events from the same conversation that happened after the failing event - causing corruption.
In order not to affect other sites and views because of a failing event, we chose option 2.
Any ‘failed’ event will go to a separate retry topic and will be handled a bit later.
To mitigate the risk, we decided upon a ‘versioning’ mechanism. Each event will have a timestamp. A projection for each key (user_id + site_id combination) will have both a ‘state’ and a version (the timestamp of the last event that updated it). So if an event for a specific key arrives with a lower timestamp than this key’s projection timestamp, it indicates that there might be corruption, or that events went out of sync. In this case we mark this specific view as Invalid.
For a specific key, a view state can be:
Valid - therefore incoming events will update it and maintain it.
Invalid - it is suspected as out of sync - therefore it is not maintained anymore by the projection engine (events