top of page

Change Data Capture at DeviantArt

Updated: Aug 17, 2021


DA Intro and Problem description


DeviantArt is a vast social network with the purpose to entertain, inspire and empower the artist in all of us. It serves more than 50 million users and has more than 1,5 billion page views monthly. As for data-related challenges, we have tens of terabytes of production data in a number of sharded MySQL databases that are replicated in a source-replica topology across different availability zones. Business Intelligence (BI) infrastructure that handles hundreds of millions of BI events daily, numerous data pipelines for Artificial Intelligence (AI) and BI use cases, and a multi-cluster search and recommendation system - all are hosted in the cloud.


Today I want to share our experience of implementing a Change Data Capture (CDC) project that helped make data at DeviantArt much more accessible for our analytics and AI use cases. Although there are many good articles describing different CDC solutions and Debezium in particular, here we’d like to focus on the engineering story: how we designed and implemented the architecture, how we accomplished connectors’ orchestration and integration with the Iceberg table format.




Spark of An Idea


A few months ago, our team was discussing ideas on how we could continuously monitor data integrity between our production databases and our internal search engine. At that point, an established data import pipeline from our search engine already existed. And there were a few options on a table for how continuous data integrity monitoring could be performed, but none of that was able to fully satisfy our requirements.

Later, while brainstorming, it became pretty clear that due to the amount of data we have and the fact that it simply couldn’t be snapshotted every single day - the most feasible way would be to use BinLog replication, a.k.a Change Data Capture paradigm. In addition, there were other strong arguments for having it as a streaming source. Upon giving this idea a deeper thought, we realized that this new stream of data would be highly beneficial for developing other areas of product, like real-time personalized recommendations, sentiment analysis of content, etc.

Even though we have an advanced BI infrastructure, there is still tremendous potential inside our data that sits in production MySQL databases. It was just hardly suitable for the OLAP workloads. So these ideas strengthened our desire to move forward with streaming production data from OLTP source (by using BinLog replication) to some sort of file format and storage type that is more suitable for the OLAP workload.


We did the research and evaluated a variety of tools that could help with the challenges we were facing, reviewing solutions like the AWS DMS, Maria MaxScale, and Debezium. Each of them, of course, has their pros and cons. But in the end we decided to make our PoC using Debezium. It’s open-source, so there is no vendor lock-in, and we could reuse our Kafka infrastructure. It’s also quite mature and has great dev and support communities, which we were and are happy to participate in.


For the storage format, we expected it to be a columnar file, e.g. Apache Parquet, except it would have to be a bit more robust in terms of supporting transactional and incremental updates. Another requirement was that it needed to be easily interoperable with different massive parallel processing tools, like Spark, Trino (formerly PrestoSQL) and languages like Python and Java. We evaluated the two most prominent options: Delta Lake and Apache Iceberg - and both could stand up to the task, but after further investigation, we decided to proceed with Iceberg, due to several reasons. Firstly, at Wix, Trino is a first-class citizen for analytics, and Iceberg looked better in terms of integration with Trino and the existing BI infra. Secondly, we see great potential in the open standard itself.


From a bird’s-eye view, the initial architecture looked like this:



I'll get back to the technical details of architecture later in the article. But first, let’s have a look at the unexpected obstacles we had to overcome before getting into the beautiful CDC (Debezium) world.



Prerequisites / Homework


The first obstacle was to migrate our sharded MariaDB databases, which are replicated as a source-replica across different availability zones, back to MySQL. Mostly because there was no support for it in Debezium at the moment, and also to make DeviantArt’s databases in line with the rest of Wix.


Next, we needed to enable GTID. The global transaction identifier (GTID) is a unique identifier created and associated with each transaction committed on the server of origin (the source). This identifier helps with replication and allows failover to another database server more easily in case of failures.


After that, we switched from mixed (statement- and row-based) BinLogs to fully row-based BinLogs.


And finally, we replaced Mha4MySQL with Orchestrator, a MySQL high availability and replication management tool.


One of the most challenging things there was to make sure that all of our SQL queries would work in the same way they worked before the migration, and that there would be no degradation in performance. To achieve this we made a regression test: recorded all the queries from the production database for a few days and replayed them later in a staging MySQL environment. Luckily for us, we only had to do some performance alterations in a small percentage of queries due to the rare MySQL optimizer bug between versions 5.7.29 and 5.7.30. Homework was completed excellently with no downtime - kudos to our DevOps team!



Architecture, Tools and Components


Most of our compute resources are in the AWS cloud so when we started this project, it was natural to continue to use it. Provisioning cloud applications can be a challenging process, so we decided to use the AWS Cloud Development Kit (CDK) as infrastructure as code, instead of the more traditional Terraform and Puppet. Mainly this was motivated by the ability to use Python in both infrastructure and application logic code. In general, AWS CDK just provisions your cloud resources in a repeatable manner through AWS CloudFormation, but the benefit is that your logic is expressed in a standard programming language like Python, making it much easier and faster to develop with the benefit of things like code completion, navigation throughout the infrastructure code, and the ability to easily deploy new testing environment, etc.


Our production MySQL cluster is presented as several sharded databases that are replicated in a source-replica topology across different availability zones. As of now, we use Orchestrator for its HA management. It’s GTID enabled. Daily BinLog size is hundreds of gigabytes.


Much more interesting is the setup of our Kafka Connect cluster. Kafka Connect is a distributed tool for scalably and reliably streaming data between Apache Kafka and other data systems. In our case, between MySQL databases (using Debezium as a source connector) and AWS S3 (using S3 sink connector). We run it on Amazon Elastic Container Service (Amazon ECS) using AWS Fargate, a fully managed container orchestration service. We set up custom Docker images for Debezium and S3 sink connectors and run them as scalable services available via an Application Load Balancer (ALB). This is a central component because it does most of the work - continuously captures data change events, sends them to the Kafka cluster, then reads the events in micro-batches from the Kafka cluster and, finally, stores them on S3 in compressed JSON lines format.


Both Kafka Connect ECS services for Debezium and S3 connectors are orchestrated by a number of Lambdas and Step Functions. For Debezium, this automation manages connector configuration for each database. It continuously monitors the Orchestrator API and updates connectors’ configurations after topology change or DB failures. Debezium connector creates one Kafka topic per table.

As for S3 Sink - our Lambda syncs the list of Kafka topics with DynamoDB. When there is a new topic in Kafka (which represents a table in the DB) - it creates a corresponding configuration for the S3 connector. This allows us to have all necessary data replicated to AWS S3.


Otherwise, it collects connectors’ running statuses and reports some metrics collected via JMX to AWS CloudWatch. So we have proactive alerts when it comes to daily monitoring.


Our next component is Spark on Elastic Map Reduce (EMR). We have a Spark job per replicated table. Each job reads change events from S3, updates Iceberg table schema when needed, and stores data using the Iceberg table format.


We have chosen this kind of batch approach over the streaming one because it provides better transparency and manageability for troubleshooting. Spark jobs manage a list of processed data files in a DynamoDB table, so it’s safe for the Spark jobs to get unprocessed files and retry the import when failures occur.

EMR cluster creation and job deployment are also managed by Step Functions.



Iceberg format itself is an open standard. It gives us a number of cool features like schema evolution and serializable isolation, so table changes are atomic, and readers never see partial or uncommitted changes. It requires Hive Metastore, but we had one at Wix already. Another nice feature is that it could be easily added as a Trino catalogue. So as soon as data is written to Iceberg, our users are able to access it via Quix UI (notebook manager for data exploration) and execute SQL queries to perform complex OLAP analysis over the data lake right away.


That being said, I would like to stress that all this data platform infrastructure makes it very easy to add a new database for change data capture streaming and replication to Iceberg. All you need to do is add a database alias to a configuration.



Troubleshooting and Configuration adjustments


During project implementation and production use we encountered several odd issues. Hopefully, this experience will help others save some time.



Initial Snapshot Interruptions


Our databases are quite big, and for some of them, it takes up to a week for Debezium to make its initial snapshot. We faced a situation when such a snapshot could not finish and restarted from scratch. That is the documented behaviour during connector failures, e.g. when some node goes down, but in our case it was Kafka Connect rebalancing due to a connector configuration change. So make sure that there is no rebalancing during the initial snapshot or make it isolated by creating a separate Kafka Connect cluster when possible.

One more helpful option is "snapshot.delay.ms" which delays the start of the snapshot-taking process during Kafka Connect bootstrapping.



Hash for Debezium server.id param


By default, Debezium provides a random numeric server.id as a replication client id to MySQL. We decided to use custom logic in our Lambda function that configures Debezium connectors. It computes a numeric hash of the connector name with a database alias in it. So it’s repeatable between connector re-configurations.



Memory for S3 sink connector


We have hundreds of s3-sink connectors per worker node. So we faced OutOfMemory exceptions. The trick to fix it was to reduce "s3.part.size" to a minimum value of 5 MB.



Encoding Issues


We had a legacy table with a MEDIUMTEXT column in the latin1 character set. But the data inside it was stored in a binary compressed format, so could contain symbols that could not be converted to UTF8 properly. And as Debezium uses UTF8 encoding for all the text data, some of the special symbols were replaced with a “?” mark. Of course, this broke the format.

Our solution was to change MEDIUMTEXT type to MEDIUMBLOB type. As blob data types are copied byte by byte and Debezium encodes BLOB columns with Base64 by default - it solved the issue for us.



EMR


We run plenty of small Spark jobs on the EMR cluster - one job per replicated table, taking from 5 to 30 minutes to process appended data. It is a bit of an unusual scenario, so we had to adjust EMR configuration. We increased the percentage of memory available for the Application Manager: "yarn.scheduler.capacity.maximum-am-resource-percent": "90"

and decreased size of memory allocation step:

"yarn.scheduler.minimum-allocation-mb": "256"


 

Business Use Cases / Takeaway


To sum it up, I believe we have achieved our goals. With the help of the CDC project, we currently have data from production databases available for the analytical load and content discovery. We have established new streaming pipelines that help us improve our Machine Learning models, search and recommendation system. Also, we tested sentiment analysis.

Better spam filtering and much more is on the way. So we encourage you to use this great approach too as it opens endless possibilities in the data and software engineering world!


Thank you

 


This post was written by Ruslan Danilin

 

For more engineering updates and insights:

bottom of page