Creating read-optimized views is essential for scaling systems. However, transitioning to a stable, denormalized view without causing system downtime is not at all trivial.
This is precisely the challenge we recently faced with Wix Inbox. In this blog post I'll share how we were able to overcome it and some lessons we learned along the way to hopefully make your future transitions easier.
Photo by Pascal Habermann on Unsplash
Some Background
Wix Inbox is where Wix users communicate with their site’s visitors and contacts.
We have tens of thousands of daily active Wix users, and terabytes of stored messages across billions of past and ongoing conversations.
Our domain consists of some basic entities:
A conversation, with messages, between 2 participants (the site and the site visitor), as well as the display data that contains the name and image to show in each conversation.
Each of the above entities marked in bold has its own table in our mySQL database.
This is a normalized data model - which was the initial state of our system.
Our mission was to enable any user to get a list of all of their unread conversations. To achieve that, a heavy query had to be executed. The query would join the conversations, the participants and the users tables to find which conversations were updated since the last time that user had read them.
Smells like something’s wrong
As traffic grew, more and more queries like that happened. We started seeing more and more slow-running queries. Especially for busy sites with lots of conversations. Luckily, mySQL allowed us to log and monitor this type of behavior so we were able to analyze it pretty quickly.
Exacerbating the problem, even if the application server got to the connection timeout and closed it, slow queries kept running on the mySQL server and added up to each other. This naturally destabilized our mySQL instance.
Denormalization to the rescue!
We realized that any optimization would have limited effect and still couldn’t support our growth. We needed to find another solution in order to scale better.
Writing to normalized tables was not the real problem. We also had a lot of simple access patterns (queries) that accessed data without any issue.
The problem was this specific access pattern - retrieving unread conversations for a given user. Meaning, we needed a dedicated view that was optimized for this case exactly.
This meant that we had to split the read and write sources, and create a read-optimized view.
Here’s the plan:
Writes go to the mySQL databases as before.
For each write, an event should be produced, representing the change that has been made.
The consumer (projection engine) of the event stream maintains and updates the read-optimized view, which is basically a projection of the data that fits the read pattern.
The access pattern at issue will be directed to the projection rather than the mySQL table.
As this view is optimized for this specific access pattern, from a query complexity standpoint, we’re going for constant time (assuming the result set’s size is bounded) as opposed to super-linear complexity in mySQL, thereby reducing load and achieving scalability.
The picture above is rather simplistic. In practice this is way more complex and introduces many challenges.
Now, while implementing this architecture is difficult enough in a new system, we have the additional challenge of making this change in a live production system.
Here are 5 of the main challenges we’ve tackled along the way:
Challenge #1 - Getting business events out of the system
Let’s take a basic send message scenario, which consists of the following actions:
Writing the message to the messages DB.
Updating the conversation’s last message time - for it to pop up to the head of the conversation’s list.
If the conversation was archived, then unarchive it.
Make sure this conversation is marked as read for the user that sent the message.
Each of the above 4 db writes are business events that are highly relevant to our projection table. Also, each event can happen on its own and not in the context of message sending. Meaning, we need to have an event fired for each of them.
2 Options to do so:
Change Data Capture (CDC) - is a software process that identifies and tracks changes to data in a database (usually close to realtime). Meaning, that as a new database transaction happens, an event is fired.
Emitting events from the application server - right after writing to the database.
Basically, the recommended way to do it is to use CDC. Why? It helps avoid a lot of headache, and a bunch of other problems the 2nd approach introduces:
Consistency - A database write may succeed but producing the event might fail. Meaning, we will have inconsistency between our source of truth (the db) and the projection. CDC reduces this risk, having events fired from within the DB write process (for instance after writing to the mySQL commit log).
Event Ordering - A distributed system, meaning a number of instances of the same application receive concurrent requests. Each instance emits events to the same topic, which might cause events in the stream to be out of order. CDC happens from the source DB which tracks changes in transaction log in order of their occurrence.
Maintainability - Imagine a new developer joining the team and adding another write as a side effect to some business transaction. If they forget to produce an event - It will probably break the projection integrity.
Dev time - One time setup of CDC on the mySQL cluster vs emitting an event after every write in the code (as well as in every new write added in the future).
Challenge #2 - Building a Projection:
Now that we have the events, we need to build the projection and maintain it.
Let’s build the data model by examining how we plan to access the data, and what data should be fetched.
Deciding upon the key:
The primary key of the projection is derived from how we query it (read from it). As the projection is optimized for reads, this is one of the most important decisions.
Our access pattern requests the unread conversations for a user in a specific site. Therefore the chosen key was the user_id and the site_id.
Why is this important at this stage? Because this key will also be used for writes and updates in the consumer that consumes the events - meaning, each event has to contain these attributes.
The value / attributes:
The data that needs to be fetched is a list of conversation ids, sorted by the last time they were updated (in descending order).
So.. this means that every event also needs to contain these two attributes.
Why?
We want to avoid querying the mySQL for more data upon each, or some, of the events. This will introduce more load on the mySQL, which completely misses the point.
Challenge #3 - Populating the new projection in a production system
So we know what we want our projection to look like. But we have a live production system, and we need to have a solid plan for populating this read-optimized view.
Meaning, we have to transform the source of the unread conversations query - from the heavy mySQL query - to our new lightweight projection, and make sure the right data is there.
Option 1: Lazy - for each user that requests the unread conversations endpoint, we return an answer from mySQL while also populating the projection with a snapshot of the data. From this point on, a user will be served from the read-optimized view.
Option 2: Eager migration - Iterate over all users and sites in our system, and for each, populate the read-optimized view with a snapshot from mySQL.
An eager migration seems heavy and also wasteful. Why migrate and keep track of a read-optimized view for users that might potentially never interact with our endpoint?
We chose the first option - of populating the projection lazily.
That means that we have to keep the old (normalized) view forever (or until we decide to eagerly migrate the inactive conversations) and keep paying the old query penalty. Thus success here will be measured by the hit ratio, meaning what percent of traffic goes to the projection rather than to the mySQL normalized view (you’ll have to read until the end to find that out).
So how is it going to work?
Client asks for unread conversations.
We check for them in the projection table. Then for each:
If it exists - it is returned to the client.
If it doesn’t exist -
We return it from mySQL
Then, we emit an event to trigger the creation of the view, i.e. take a snapshot from mySQL, and populate the projection.
From that point on, the projection will be maintained by the projection engine and the events (more on that in the last section).
Challenge #4 - Fault Tolerance:
So what can go wrong, and how can we deal with it? We’re maintaining a projection for the unread conversations a certain user has in a specific site. Consider this simple scenario:
A visitor sends a message to a site.
The site owner is online and sees the message.
There are two events here, one happens on MessageSent - marking the conversation as unread. The other is the ConversationRead event (by the user) - marking this conversation as read.
Now, if the ConversationRead event was processed before the MessageSent event - we would have a corrupted view.
We use Apache Kafka for our events. So without getting into Kafka internals, we made sure that events of the same site will always be handled by the same consumer to avoid any race condition of this kind.
What would happen if for some reason, the first event fails to update the projection, and the second succeeds? Again, we’ll get a corrupted view.
Couple of options here:
A consumer with a blocking retry policy, meaning it will retry handling a failed event until success, while blocking other events from being processed.
Upon failure, put the failed message in a separate retry topic, handle it there, and in the meanwhile keep streaming and handling the next messages in the queue.
Yes, option one ensures the handling of events in order. But upon failure(s) will delay every other message in the queue and potentially affect other sites and views, delaying their update.
Option 2, on the other hand, will keep streaming next events, but might handle events from the same conversation that happened after the failing event - causing corruption.
In order not to affect other sites and views because of a failing event, we chose option 2.
Any ‘failed’ event will go to a separate retry topic and will be handled a bit later.
To mitigate the risk, we decided upon a ‘versioning’ mechanism. Each event will have a timestamp. A projection for each key (user_id + site_id combination) will have both a ‘state’ and a version (the timestamp of the last event that updated it). So if an event for a specific key arrives with a lower timestamp than this key’s projection timestamp, it indicates that there might be corruption, or that events went out of sync. In this case we mark this specific view as Invalid.
For a specific key, a view state can be:
Valid - therefore incoming events will update it and maintain it.
Invalid - it is suspected as out of sync - therefore it is not maintained anymore by the projection engine (events for this key will be ignored).
Not Exist - meaning, no one ever requested this key, therefore it was never created. And more importantly - any incoming event will be dismissed as there’s nothing to update.
Now, let’s use the scenario above as an example:
MessageSent (1) event fails to be handled and goes to a retry queue.
ConversationRead (2) event arrives:
Updates the view - and marks the conversation as read.
Sets the view version to (2).
MessageSent event is re-handled, but view version (2) is larger than this event’s version (1).
View state is set to Invalid.
Next time the user requests it - data will come from mySQL and the view will recreate itself.
And that is how we “self heal” data corruptions.
Challenge #5 - From zero to hero - gradual rollout to production
Let’s recap for a moment.
A live stream of events updates the projection only for valid keys and is ignored for any invalid or non-existing keys. A read-optimized view for a specific key is created and valid only if it was requested by a user.
So the initial state is that we have a live stream of events with an empty view database and 0 read traffic to the optimized view.
As mentioned above, populating the view with a single new entry involved running a relatively heavy query on the mySQL database. This meant that opening the new process to 100% of the read traffic at once would have resulted in a mass of heavy read requests which would have stressed mySQL and could have crashed it. Which meant that we needed to gradually roll out the change. In order to achieve that, we used our experiment system Wix Petri.
Apart from controlling the load during rollout, another critical aspect was to achieve certainty that the view was created properly and was in fact eventually consistent with the source of truth.
Our solution here was to create a feature toggle that controlled the source of the read response.
The toggle had the following possible values:
The 2 trivial ones:
MySQL - reads from the mySQL database - which was the default initial state
Read Server - reads from the view and returns an answer, falls back to mySQL if it does not exist and creates the view asynchronously.
But here, we added 2 more options:
Compare And Return From MySQL
Compare And Return From Read Server
Each of those last 2 options returns the response from the specified source and additionally reads from the other source and compares between the two responses. The result of the comparison is then reported as a metric.
The nice thing here is that option 3 (Compare And Return From MySql) basically makes the whole system work, populates the view and provides metrics and data, but has practically zero effect on users in production.
This toggle allowed us to distribute traffic into groups, each group of users with a different toggle value.
We opened read traffic gradually - with the comparison option that returns responses from mySQL. 10%... 20%.. 30%... each time we waited a bit before opening further while monitoring the mySQL source, the comparison metrics and the other metrics we had on each part of this system.
Eventually, we were able to get to 100%, with high confidence and certainty about our new system.
We found the comparison metrics very useful and reassuring, so even after the rollout was done, we decided to have 5% of the requests continue to perform the comparison - to keep monitoring system stability and have alerts on problems that might occur.
To Sum it Up
We achieved a read-optimized view hit rate of aproximetly 95.5%.
The main advantage of our approach was having the ability to gradually roll it out on a live production system.
The migration, including it's design, development and rollout, was far from trivial (and took about 3 months of hard work, trial and error), but the lessons we've learned along the way are priceless and definitely make future transitions easier when we take these challenges into account ahead of time.
I hope it can also help you with your future transitions! Good luck!
Special thanks to my awesome partners in this journey - Ami Magid, Benny Holtzer and Yuval Levizky.
This post was written by David Vaks
For more engineering updates and insights:
Join our Telegram channel
Visit us on GitHub
Subscribe to our YouTube channel