How Hevo ingests data from MySQL in Real-Time

5 min read

We, at Hevo, support real time, reliable and loss-less data ingestion from different sources, comprising of databases(mysql, postgres, mongo, dynamodb etc) and enterprise solutions(salesforce, appsflyer, mixpanel etc).  Mysql is one of the most popular and commonly used databases that we support. Hevo uses mysql binary logs to replicate data from Mysql in real-time and at scale.

What is Binary Log

Binary logs or binlogs, are a set of append only log files, which contains events that represent changes made to table data(inserts, updates, deletes etc)  and changes related to table/database creations and schema updates. Its used mainly in Mysql for two purposes.

  • Replication : Mysql achieves data replication to slave servers using binlogs. Slaves read the binlogs from master and then apply the change events to its tables to be in sync with the master.
  • Recovery :  During recovery from backups, the changes made to the database after the last backup time are re-applied to the tables using the binary log change events.
    Binlogs are the most reliable way to replicate data from Mysql as its really fast and guarantees zero data loss.

How Hevo does binlog replication

Hevo taps into the Mysql replication stream of  client servers using the shyiko library, which reads the binlogs and  de-serialises the events into event objects(for eg: UpdateRowsEventData, WriteRowsEventData, DeleteRowsEventData, TableMapEventData etc). These data events are then converted to internal data objects by Hevo and gets published to a kafka topic to be consumed by consumers, which will eventually sync these events to destination.

Challenges with this solution

  1. Keeping table schemas in sync

A change event (create/update/delete events) in Mysql binlogs does not contain the column name or other column metadata(like column type, column length, primary keys etc). As a result, Shyiko also delivers the event as an array of values. To convert this array to a valid data object which can be synced to destination, Hevo needs the schema of the table at the time, this update was made.

Hevo solves this problem by persisting and storing the schemas corresponding to binlog positions, whenever the schema of the table gets changed. When the pipeline is first created, current table schema is fetched from Mysql server and the schema is then kept in sync by listening to the alter/create statements in the binlog. We use JsqlParser to parse the ddl statements and update the schema accordingly. Since the schema versions are stored corresponding to specific binlog positions, the schema corresponding to a previous position can also be fetched and applied, if we decide to move the pipeline offset to a previous binlog position.

This approach has the following limitations

  • We are unable to move the pipeline offset to a position which was written to, before the pipeline was created.
  • All the alter/create statements needs to be handled to keep the schemas in sync. Its very difficult to do this exhaustively, as the sql syntax will keep on changing over a period of time across versions.

    We are evaluating to see if we can leverage TABLE_MAP_EVENT in the binlogs to get the table schemas corresponding to the update, to mitigate these limitations.

2.   Pipeline resumption in the middle of a transaction

TABLE_MAP_EVENT is the event preceding the change event, which contains the metadata about the table which is about to be changed. It contains a table_id and the table metadata corresponding to that table_id.  Every change event in the binary logs will have the table_id , to indicate the table schema corresponding to the event and is used by Shyiko to deserialise the change events. Some interesting facts to consider about table_id, which are not well documented

  • table_id is an in-memory global counter in Mysql and will get reset, if the server is restarted.
  • table_ids are not unique for a table, even when its not restarted and can change when the table schema is altered or even when some columns are renamed. Essentially, it represents a particular table schema and not a table as such.

Its not necessary for every table change event to be preceded by a TABLE_MAP_EVENT. In case of transactions, a set of change events can be grouped together and can share a single TABLE_MAP_EVENT.  The following figure shows the binary log events for a transaction starting at position 645. In this case, there is a single TABLE_MAP_EVENT(at position 719) for the whole transaction.

binary log events for a multi-update transaction

Shyiko keeps the table map events in an in-memory map when a table map event is encountered in the binary logs and uses that to deserialise the subsequent change events. Its possible that Hevo can stop the poll without reading all the events of a transaction and store the offset corresponding to the last read binlog position. When the next poll is started, we pass the last read offset to Shyiko, which will try to read the events from that position and  fail to deserialise the events as it does not have a table map event corresponding to that update.

To solve this, Hevo persists the table map events to a persistent store via a write through cache, along with binlog positions where they are encountered (to handle the case where table ids are reset, when the server restarts). These table map positions are then passed on-demand to Shyiko deserialiser, if it fails to find the table map events in its in-memory map.

What about existing data

When a pipeline is created, Hevo always starts replicating data from the last known position in binlogs, which will capture the updates from the time the pipeline is created.  Now what about data which is already present on the database?

The existing data on the tables is ingested using historical load tasks(one for each table), which will replicate historical data incrementally using an auto detected unique column or a column of user's choice. For example, an auto incrementing id or a primary key would be an ideal candidate to pull up historical data from the tables. We will write about how we achieve this in detail, at a later stage.

Some interesting numbers

The binary log based solution has been in production for a couple of years and around 20+ customers have created around 100+ binlog pipelines on our platform over the years.

Currently, we process close to 25 billion binlog events per month combined across our shared and private environments. Regarding throughput from a single pipeline(which connects to one database server) , we process close to 800K events/minute or 13K events/second from one of our most write-heavy Mysql servers.

What lies ahead

  • As mentioned above, we are evaluating to use TABLE_MAP_EVENT to figure out the schema/metadata(column name, column types, primary keys etc) of a particular change event, which can help us overcome some of the limitations we have right now and also simplify the process and make it more reliable.
  • We are planning to make some optimisations around historical loads, which will enable it to seamlessly scale across tens of thousands of tables and millions of columns across those tables.
Show Comments