Skip to main content

Ingest data from MySQL CDC

Change Data Capture (CDC) refers to the process of identifying and capturing data changes in a database, then delivering the changes to a downstream service in real time.

RisingWave supports ingesting row-level data (INSERT, UPDATE, and DELETE operations) from the changes of a MySQL database. The supported MySQL versions are 5.7 and 8.0.x.

You can ingest CDC data from MySQL in two ways:

  • Using the native MySQL CDC connector in RisingWave

    With this connector, RisingWave can connect to MySQL databases directly to obtain data from the binlog without starting additional services.

    Beta feature

    The built-in MySQL CDC connector in RisingWave is currently in Beta. Please use with caution as stability issues may still occur. Its functionality may evolve based on feedback. Please report any issues encountered to our team.

  • Using a CDC tool and a message broker

    You can use a CDC tool then use the Kafka, Pulsar, or Kinesis connector to send the CDC data to RisingWave.

This topic describes how to ingest MySQL CDC data into RisingWave using the native MySQL CDC connector. Using an external CDC tool and a message broker is introduced in Create source via event streaming systems.

Set up MySQL

Before using the native MySQL CDC connector in RisingWave, you need to complete several configurations on MySQL.

To use the MySQL CDC features, we need to create a MySQL user account with appropriate privileges on all databases for which RisingWave will read from.

Create a user and grant privileges

  1. Create a MySQL user with the following query.
CREATE USER 'user'@'%' IDENTIFIED BY 'password';
  1. Grant the appropriate privileges to the user.
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%';
  1. Finalize the privileges.
FLUSH PRIVILEGES;

Enable the binlog

The binlog must be enabled for MySQL replication. The binary logs record transaction updates for replication tools to propagate changes.

  1. Check if the log-bin is already on.
SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
  1. If it is OFF, configure your MySQL server configuration file, my.cnf, with the following properties described below. Restart your MySQL server to let the configurations take effect.
server-id         = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
  1. Confirm your changes by checking the log-bin again.
SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+

See Setting up MySQL for more details.

Notes about running RisingWave from binaries

If you are running RisingWave locally from binaries and intend to use the native CDC source connectors or the JDBC sink connector, make sure that you have JDK 11 or later versions is installed in your environment.

EXPERIMENTAL ENHANCEMENT AVAILABLE

We have optimized the data backfilling logic for CDC tables to improve data ingestion performance of the MySQL CDC connector. This is currently an experimental feature, and is not enabled by default. To enable it, run this command in RisingWave:

SET cdc_backfill="true";

Create a table using the native CDC connector in RisingWave

To ensure all data changes are captured, you must create a table and specify primary keys. See the CREATE TABLE command for more details. The data format must be Debezium JSON.

Syntax

CREATE TABLE [ IF NOT EXISTS ] source_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='mysql-cdc',
<field>=<value>, ...
)
[ FORMAT DEBEZIUM ENCODE JSON ];

Note that a primary key is required.

WITH parameters

All the fields listed below are required.

FieldNotes
hostnameHostname of the database.
portPort number of the database.
usernameUsername of the database.
passwordPassword of the database.
database.nameName of the database. Note that RisingWave cannot read data from a built-in MySQL database, such as mysql, sys, etc.
table.nameName of the table that you want to ingest data from.
server.idOptional. A numeric ID of the database client. It must be unique across all database processes that are running in the MySQL cluster. If not specified, RisingWave will generate a random ID.
transactionalOptional. Specify whether you want to enable transactions for the CDC table that you are about to create. Transactions within a CDC table are currently in Beta. For details, see Transaction within a CDC table.

Data format

Data is in Debezium JSON or Debezium AVRO format. Debezium is a log-based CDC tool that can capture row changes from various database management systems such as PostgreSQL, MySQL, and SQL Server and generate events with consistent structures in real time. The MySQL CDC connector in RisingWave supports JSON or AVRO as the serialization format for Debezium data. The data format does not need to be specified when creating a table with mysql-cdc as the source.

Example

CREATE TABLE orders (
order_id int,
order_date bigint,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '3306',
username = 'root',
password = '123456',
database.name = 'mydb',
table.name = 'orders',
server.id = '5454'
);

Data type mapping

The following table shows the corresponding data type in RisingWave that should be specified when creating a source. For details on native RisingWave data types, see Overview of data types.

RisingWave data types marked with an asterisk indicates that while there is no corresponding RisingWave data type, the ingested data can still be consumed as the listed type.

MySQL typeRisingWave type
BOOLEAN, BOOLBOOLEAN
BIT(1)BOOLEAN*
BIT(>1)No support
TINYINTSMALLINT
SMALLINT[(M)]SMALLINT
MEDIUMINT[(M)]INTEGER
INT, INTEGER[(M)]INTEGER
BIGINT[(M)]BIGINT
REAL[(M,D)]REAL
FLOAT[(P)]REAL
FLOAT(M,D)DOUBLE PRECISION
DOUBLE[(M,D)]DOUBLE PRECISION
CHAR[(M)]CHARACTER VARYING
VARCHAR[(M)]CHARACTER VARYING
BINARY[(M)]BYTEA
VARBINARY[(M)]BYTEA
TINYBLOBBYTEA
TINYTEXTCHARACTER VARYING
BLOBBYTEA
TEXTCHARACTER VARYING
MEDIUMBLOBBYTEA
MEDIUMTEXTCHARACTER VARYING
LONGBLOBBYTEA
LONGTEXTBYTEA or CHARACTER VARYING
JSONJSONB
ENUMCHARACTER VARYING*
SETNo support
YEAR[(2|4)]INTEGER
TIMESTAMP[(M)]TIMESTAMP WITH TIME ZONE
DATEDATE
TIME[(M)]TIME WITHOUT TIME ZONE
DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)TIMESTAMP WITHOUT TIME ZONE
DATETIME(4), DATETIME(5), DATETIME(6)TIMESTAMP WITHOUT TIME ZONE
NUMERIC[(M[,D])]NUMERIC
DECIMAL[(M[,D])]NUMERIC
GEOMETRY, LINESTRING, POLYGON,
MULTIPOINT, MULTILINESTRING,
MULTIPOLYGON, GEOMETRYCOLLECTION
STRUCT

Help us make this doc better!